aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2017-09-06 18:58:25 -0700
committerGravatar David Garcia Quintas <dgq@google.com>2017-09-06 18:58:25 -0700
commit4dc3c0bdd33892af5a79964ee3e881c7ad86eb3e (patch)
treeff9e6e138b061a6a770393b5368258c2d647d1df /src
parent243fe9d43c6bed0168670255d8208795c8438a79 (diff)
parent04035de631a6a46c03ebf3e8db2102d5688dc71a (diff)
Merge branch 'master' of github.com:grpc/grpc into filters_prio
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c16
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c22
-rw-r--r--src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c36
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.c11
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c75
-rw-r--r--src/core/lib/debug/stats.c67
-rw-r--r--src/core/lib/debug/stats.h44
-rw-r--r--src/core/lib/debug/stats_data.c (renamed from src/core/lib/support/thd_internal.h)17
-rw-r--r--src/core/lib/debug/stats_data.h47
-rw-r--r--src/core/lib/debug/stats_data.yaml9
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c232
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c8
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c2
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c4
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c6
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_posix.c2
-rw-r--r--src/core/lib/iomgr/exec_ctx.h7
-rw-r--r--src/core/lib/iomgr/iocp_windows.c2
-rw-r--r--src/core/lib/iomgr/tcp_posix.c15
-rw-r--r--src/core/lib/profiling/timers.h2
-rw-r--r--src/core/lib/support/block_annotate.h22
-rw-r--r--src/core/lib/surface/call.c70
-rw-r--r--src/core/lib/surface/call.h10
-rw-r--r--src/core/lib/surface/call_log_batch.c2
-rw-r--r--src/core/lib/surface/init.c3
-rw-r--r--src/core/lib/transport/byte_stream.c1
-rw-r--r--src/core/lib/transport/byte_stream.h4
-rw-r--r--src/core/lib/transport/metadata_batch.c2
-rw-r--r--src/core/lib/transport/metadata_batch.h1
-rw-r--r--src/core/lib/transport/static_metadata.c25
-rw-r--r--src/core/lib/transport/static_metadata.h2
-rw-r--r--src/core/tsi/test_creds/BUILD14
-rw-r--r--src/cpp/common/channel_filter.cc2
-rw-r--r--src/cpp/common/channel_filter.h11
-rw-r--r--src/cpp/common/core_codegen.cc4
-rw-r--r--src/cpp/server/server_cc.cc9
-rw-r--r--src/objective-c/!ProtoCompiler-gRPCPlugin.podspec2
-rw-r--r--src/objective-c/!ProtoCompiler.podspec2
-rw-r--r--src/php/ext/grpc/php_grpc.c4
-rw-r--r--src/proto/grpc/health/v1/BUILD10
-rw-r--r--src/proto/grpc/lb/v1/BUILD10
-rw-r--r--src/proto/grpc/reflection/v1alpha/BUILD10
-rw-r--r--src/proto/grpc/status/BUILD10
-rw-r--r--src/proto/grpc/testing/BUILD10
-rw-r--r--src/proto/grpc/testing/duplicate/BUILD10
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
-rw-r--r--src/python/grpcio_testing/grpc_testing/__init__.py289
-rw-r--r--src/python/grpcio_testing/grpc_testing/_common.py68
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/__init__.py20
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_handler.py215
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_rpc.py153
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_server.py149
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py93
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_service.py88
-rw-r--r--src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py74
-rw-r--r--src/python/grpcio_tests/tests/testing/_server_application.py66
-rw-r--r--src/python/grpcio_tests/tests/testing/_server_test.py169
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
-rw-r--r--src/ruby/.rubocop_todo.yml565
-rwxr-xr-xsrc/ruby/end2end/grpc_class_init_client.rb2
-rw-r--r--src/ruby/ext/grpc/rb_call.c3
62 files changed, 2581 insertions, 252 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
index fd0fb41fb9..a50ba09bf5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c
@@ -296,8 +296,6 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx,
static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) {
pick_first_lb_policy *p = (pick_first_lb_policy *)policy;
- /* Find the number of backend addresses. We ignore balancer
- * addresses, since we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@@ -317,11 +315,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
return;
}
const grpc_lb_addresses *addresses = arg->value.pointer.p;
- size_t num_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (!addresses->addresses[i].is_balancer) ++num_addrs;
- }
- if (num_addrs == 0) {
+ if (addresses->num_addresses == 0) {
// Empty update. Unsubscribe from all current subchannels and put the
// channel in TRANSIENT_FAILURE.
grpc_connectivity_state_set(
@@ -333,9 +327,10 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
}
if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses",
- (void *)p, (unsigned long)num_addrs);
+ (void *)p, (unsigned long)addresses->num_addresses);
}
- grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs);
+ grpc_subchannel_args *sc_args =
+ gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses);
/* We remove the following keys in order for subchannel keys belonging to
* subchannels point to the same address to match. */
static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS,
@@ -344,7 +339,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
/* Create list of subchannel args for new addresses in \a args. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (addresses->addresses[i].is_balancer) continue;
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
if (addresses->addresses[i].user_data != NULL) {
gpr_log(GPR_ERROR,
"This LB policy doesn't support user data. It will be ignored");
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
index a63bdd933d..866fb9a1eb 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c
@@ -74,9 +74,6 @@ typedef struct round_robin_lb_policy {
bool started_picking;
/** are we shutting down? */
bool shutdown;
- /** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be
- * service after this point, the policy will never transition out. */
- bool in_connectivity_shutdown;
/** List of picks that are waiting on connectivity */
pending_pick *pending_picks;
@@ -424,7 +421,6 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_closure *on_complete) {
round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
GPR_ASSERT(!p->shutdown);
- GPR_ASSERT(!p->in_connectivity_shutdown);
if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) {
gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol);
}
@@ -537,7 +533,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked(
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error),
"rr_shutdown");
- p->in_connectivity_shutdown = true;
+ p->shutdown = true;
new_state = GRPC_CHANNEL_SHUTDOWN;
} else if (subchannel_list->num_transient_failures ==
p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */
@@ -741,8 +737,6 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) {
round_robin_lb_policy *p = (round_robin_lb_policy *)policy;
- /* Find the number of backend addresses. We ignore balancer addresses, since
- * we don't know how to handle them. */
const grpc_arg *arg =
grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES);
if (arg == NULL || arg->type != GRPC_ARG_POINTER) {
@@ -761,12 +755,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
return;
}
grpc_lb_addresses *addresses = arg->value.pointer.p;
- size_t num_addrs = 0;
- for (size_t i = 0; i < addresses->num_addresses; i++) {
- if (!addresses->addresses[i].is_balancer) ++num_addrs;
- }
- rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs);
- if (num_addrs == 0) {
+ rr_subchannel_list *subchannel_list =
+ rr_subchannel_list_create(p, addresses->num_addresses);
+ if (addresses->num_addresses == 0) {
grpc_connectivity_state_set(
exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
@@ -798,9 +789,8 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
GRPC_ARG_LB_ADDRESSES};
/* Create subchannels for addresses in the update. */
for (size_t i = 0; i < addresses->num_addresses; i++) {
- /* Skip balancer addresses, since we only know how to handle backends. */
- if (addresses->addresses[i].is_balancer) continue;
- GPR_ASSERT(i < num_addrs);
+ // If there were any balancer, we would have chosen grpclb policy instead.
+ GPR_ASSERT(!addresses->addresses[i].is_balancer);
memset(&sc_args, 0, sizeof(grpc_subchannel_args));
grpc_arg addr_arg =
grpc_create_subchannel_address_arg(&addresses->addresses[i].address);
diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
index 56ed4371a9..3ff081a514 100644
--- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
+++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c
@@ -32,6 +32,7 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/ext/filters/client_channel/resolver_registry.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
@@ -125,7 +126,6 @@ static const grpc_resolver_vtable fake_resolver_vtable = {
struct grpc_fake_resolver_response_generator {
fake_resolver* resolver; // Set by the fake_resolver constructor to itself.
- grpc_channel_args* next_response;
gpr_refcount refcount;
};
@@ -151,19 +151,26 @@ void grpc_fake_resolver_response_generator_unref(
}
}
-static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg,
- grpc_error* error) {
- grpc_fake_resolver_response_generator* generator =
- (grpc_fake_resolver_response_generator*)arg;
+typedef struct set_response_closure_arg {
+ grpc_closure set_response_closure;
+ grpc_fake_resolver_response_generator* generator;
+ grpc_channel_args* next_response;
+} set_response_closure_arg;
+
+static void set_response_closure_fn(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ set_response_closure_arg* closure_arg = arg;
+ grpc_fake_resolver_response_generator* generator = closure_arg->generator;
fake_resolver* r = generator->resolver;
if (r->next_results != NULL) {
grpc_channel_args_destroy(exec_ctx, r->next_results);
}
- r->next_results = generator->next_response;
+ r->next_results = closure_arg->next_response;
if (r->results_upon_error != NULL) {
grpc_channel_args_destroy(exec_ctx, r->results_upon_error);
}
- r->results_upon_error = grpc_channel_args_copy(generator->next_response);
+ r->results_upon_error = grpc_channel_args_copy(closure_arg->next_response);
+ gpr_free(closure_arg);
fake_resolver_maybe_finish_next_locked(exec_ctx, r);
}
@@ -171,12 +178,15 @@ void grpc_fake_resolver_response_generator_set_response(
grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator,
grpc_channel_args* next_response) {
GPR_ASSERT(generator->resolver != NULL);
- generator->next_response = grpc_channel_args_copy(next_response);
- GRPC_CLOSURE_SCHED(
- exec_ctx, GRPC_CLOSURE_CREATE(set_response_cb, generator,
- grpc_combiner_scheduler(
- generator->resolver->base.combiner)),
- GRPC_ERROR_NONE);
+ set_response_closure_arg* closure_arg = gpr_zalloc(sizeof(*closure_arg));
+ closure_arg->generator = generator;
+ closure_arg->next_response = grpc_channel_args_copy(next_response);
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_INIT(&closure_arg->set_response_closure,
+ set_response_closure_fn, closure_arg,
+ grpc_combiner_scheduler(
+ generator->resolver->base.combiner)),
+ GRPC_ERROR_NONE);
}
static void* response_generator_arg_copy(void* p) {
diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c
index 80eb51ff0d..711938b278 100644
--- a/src/core/ext/transport/chttp2/transport/writing.c
+++ b/src/core/ext/transport/chttp2/transport/writing.c
@@ -154,17 +154,8 @@ static uint32_t target_write_size(grpc_chttp2_transport *t) {
}
// Returns true if initial_metadata contains only default headers.
-//
-// TODO(roth): The fact that we hard-code these particular headers here
-// is fairly ugly. Need some better way to know which headers are
-// default, maybe via a bit in the static metadata table?
static bool is_default_initial_metadata(grpc_metadata_batch *initial_metadata) {
- int num_default_fields =
- (initial_metadata->idx.named.status != NULL) +
- (initial_metadata->idx.named.content_type != NULL) +
- (initial_metadata->idx.named.grpc_encoding != NULL) +
- (initial_metadata->idx.named.grpc_accept_encoding != NULL);
- return (size_t)num_default_fields == initial_metadata->list.count;
+ return initial_metadata->list.default_count == initial_metadata->list.count;
}
grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index abb558982b..765c13c65e 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -187,9 +187,34 @@ struct stream_obj {
/* Mutex to protect storage */
gpr_mu mu;
+
+ /* Refcount object of the stream */
+ grpc_stream_refcount *refcount;
};
typedef struct stream_obj stream_obj;
+#ifndef NDEBUG
+#define GRPC_CRONET_STREAM_REF(stream, reason) \
+ grpc_cronet_stream_ref((stream), (reason))
+#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
+ grpc_cronet_stream_unref((exec_ctx), (stream), (reason))
+void grpc_cronet_stream_ref(stream_obj *s, const char *reason) {
+ grpc_stream_ref(s->refcount, reason);
+}
+void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s,
+ const char *reason) {
+ grpc_stream_unref(exec_ctx, s->refcount, reason);
+}
+#else
+#define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream))
+#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \
+ grpc_cronet_stream_unref((exec_ctx), (stream))
+void grpc_cronet_stream_ref(stream_obj *s) { grpc_stream_ref(s->refcount); }
+void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s) {
+ grpc_stream_unref(exec_ctx, s->refcount);
+}
+#endif
+
static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx,
struct op_and_state *oas);
@@ -346,13 +371,12 @@ static void remove_from_storage(struct stream_obj *s,
This can get executed from the Cronet network thread via cronet callback
or on the application supplied thread via the perform_stream_op function.
*/
-static void execute_from_storage(stream_obj *s) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) {
gpr_mu_lock(&s->mu);
for (struct op_and_state *curr = s->storage.head; curr != NULL;) {
CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done);
GPR_ASSERT(curr->done == 0);
- enum e_op_result result = execute_stream_op(&exec_ctx, curr);
+ enum e_op_result result = execute_stream_op(exec_ctx, curr);
CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr,
op_result_string(result));
/* if this op is done, then remove it and free memory */
@@ -369,7 +393,6 @@ static void execute_from_storage(stream_obj *s) {
}
}
gpr_mu_unlock(&s->mu);
- grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -377,6 +400,8 @@ static void execute_from_storage(stream_obj *s) {
*/
static void on_failed(bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs);
@@ -392,7 +417,9 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -400,6 +427,8 @@ static void on_failed(bidirectional_stream *stream, int net_error) {
*/
static void on_canceled(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs);
@@ -415,7 +444,9 @@ static void on_canceled(bidirectional_stream *stream) {
}
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -423,6 +454,8 @@ static void on_canceled(bidirectional_stream *stream) {
*/
static void on_succeeded(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+
stream_obj *s = (stream_obj *)stream->annotation;
gpr_mu_lock(&s->mu);
bidirectional_stream_destroy(s->cbs);
@@ -430,7 +463,9 @@ static void on_succeeded(bidirectional_stream *stream) {
s->cbs = NULL;
null_and_maybe_free_read_buffer(s);
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport");
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -438,6 +473,7 @@ static void on_succeeded(bidirectional_stream *stream) {
*/
static void on_stream_ready(bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation;
grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct;
gpr_mu_lock(&s->mu);
@@ -457,7 +493,8 @@ static void on_stream_ready(bidirectional_stream *stream) {
}
}
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -513,14 +550,15 @@ static void on_response_headers_received(
s->state.pending_read_from_cronet = true;
}
gpr_mu_unlock(&s->mu);
+ execute_from_storage(&exec_ctx, s);
grpc_exec_ctx_finish(&exec_ctx);
- execute_from_storage(s);
}
/*
Cronet callback
*/
static void on_write_completed(bidirectional_stream *stream, const char *data) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data);
gpr_mu_lock(&s->mu);
@@ -530,7 +568,8 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) {
}
s->state.state_callback_received[OP_SEND_MESSAGE] = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -538,6 +577,7 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) {
*/
static void on_read_completed(bidirectional_stream *stream, char *data,
int count) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
stream_obj *s = (stream_obj *)stream->annotation;
CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
count);
@@ -563,14 +603,15 @@ static void on_read_completed(bidirectional_stream *stream, char *data,
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
}
} else {
null_and_maybe_free_read_buffer(s);
s->state.rs.read_stream_closed = true;
gpr_mu_unlock(&s->mu);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
}
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -625,12 +666,11 @@ static void on_response_trailers_received(
s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true;
gpr_mu_unlock(&s->mu);
- grpc_exec_ctx_finish(&exec_ctx);
} else {
gpr_mu_unlock(&s->mu);
- grpc_exec_ctx_finish(&exec_ctx);
- execute_from_storage(s);
+ execute_from_storage(&exec_ctx, s);
}
+ grpc_exec_ctx_finish(&exec_ctx);
}
/*
@@ -1313,6 +1353,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
const void *server_data, gpr_arena *arena) {
stream_obj *s = (stream_obj *)gs;
+
+ s->refcount = refcount;
+ GRPC_CRONET_STREAM_REF(s, "cronet transport");
memset(&s->storage, 0, sizeof(s->storage));
s->storage.head = NULL;
memset(&s->state, 0, sizeof(s->state));
@@ -1370,7 +1413,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
stream_obj *s = (stream_obj *)gs;
add_to_storage(s, op);
- execute_from_storage(s);
+ execute_from_storage(exec_ctx, s);
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
diff --git a/src/core/lib/debug/stats.c b/src/core/lib/debug/stats.c
new file mode 100644
index 0000000000..4dbd94c724
--- /dev/null
+++ b/src/core/lib/debug/stats.c
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/debug/stats.h"
+
+#include <inttypes.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/support/string.h"
+
+grpc_stats_data *grpc_stats_per_cpu_storage = NULL;
+static size_t g_num_cores;
+
+void grpc_stats_init(void) {
+ g_num_cores = GPR_MAX(1, gpr_cpu_num_cores());
+ grpc_stats_per_cpu_storage =
+ gpr_zalloc(sizeof(grpc_stats_data) * g_num_cores);
+}
+
+void grpc_stats_shutdown(void) { gpr_free(grpc_stats_per_cpu_storage); }
+
+void grpc_stats_collect(grpc_stats_data *output) {
+ memset(output, 0, sizeof(*output));
+ for (size_t core = 0; core < g_num_cores; core++) {
+ for (size_t i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) {
+ output->counters[i] += gpr_atm_no_barrier_load(
+ &grpc_stats_per_cpu_storage[core].counters[i]);
+ }
+ }
+}
+
+char *grpc_stats_data_as_json(const grpc_stats_data *data) {
+ gpr_strvec v;
+ char *tmp;
+ bool is_first = true;
+ gpr_strvec_init(&v);
+ gpr_strvec_add(&v, gpr_strdup("{"));
+ for (size_t i = 0; i < GRPC_STATS_COUNTER_COUNT; i++) {
+ gpr_asprintf(&tmp, "%s\"%s\": %" PRIdPTR, is_first ? "" : ", ",
+ grpc_stats_counter_name[i], data->counters[i]);
+ gpr_strvec_add(&v, tmp);
+ is_first = false;
+ }
+ gpr_strvec_add(&v, gpr_strdup("}"));
+ tmp = gpr_strvec_flatten(&v, NULL);
+ gpr_strvec_destroy(&v);
+ return tmp;
+}
diff --git a/src/core/lib/debug/stats.h b/src/core/lib/debug/stats.h
new file mode 100644
index 0000000000..563b108dff
--- /dev/null
+++ b/src/core/lib/debug/stats.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_DEBUG_STATS_H
+#define GRPC_CORE_LIB_DEBUG_STATS_H
+
+#include <grpc/support/atm.h>
+#include "src/core/lib/debug/stats_data.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+typedef struct grpc_stats_data {
+ gpr_atm counters[GRPC_STATS_COUNTER_COUNT];
+} grpc_stats_data;
+
+extern grpc_stats_data *grpc_stats_per_cpu_storage;
+
+#define GRPC_THREAD_STATS_DATA(exec_ctx) \
+ (&grpc_stats_per_cpu_storage[(exec_ctx)->starting_cpu])
+
+#define GRPC_STATS_INC_COUNTER(exec_ctx, ctr) \
+ (gpr_atm_no_barrier_fetch_add( \
+ &GRPC_THREAD_STATS_DATA((exec_ctx))->counters[(ctr)], 1))
+
+void grpc_stats_init(void);
+void grpc_stats_shutdown(void);
+void grpc_stats_collect(grpc_stats_data *output);
+char *grpc_stats_data_as_json(const grpc_stats_data *data);
+
+#endif
diff --git a/src/core/lib/support/thd_internal.h b/src/core/lib/debug/stats_data.c
index cc468c7846..2203358a7e 100644
--- a/src/core/lib/support/thd_internal.h
+++ b/src/core/lib/debug/stats_data.c
@@ -1,6 +1,5 @@
/*
- *
- * Copyright 2015 gRPC authors.
+ * Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,12 +12,14 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
-#ifndef GRPC_CORE_LIB_SUPPORT_THD_INTERNAL_H
-#define GRPC_CORE_LIB_SUPPORT_THD_INTERNAL_H
-
-/* Internal interfaces between modules within the gpr support library. */
+/*
+ * Automatically generated by tools/codegen/core/gen_stats_data.py
+ */
-#endif /* GRPC_CORE_LIB_SUPPORT_THD_INTERNAL_H */
+#include "src/core/lib/debug/stats_data.h"
+const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
+ "client_calls_created", "server_calls_created", "syscall_write",
+ "syscall_read", "syscall_poll", "syscall_wait",
+};
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
new file mode 100644
index 0000000000..c9c2f65c30
--- /dev/null
+++ b/src/core/lib/debug/stats_data.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Automatically generated by tools/codegen/core/gen_stats_data.py
+ */
+
+#ifndef GRPC_CORE_LIB_DEBUG_STATS_DATA_H
+#define GRPC_CORE_LIB_DEBUG_STATS_DATA_H
+
+typedef enum {
+ GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED,
+ GRPC_STATS_COUNTER_SERVER_CALLS_CREATED,
+ GRPC_STATS_COUNTER_SYSCALL_WRITE,
+ GRPC_STATS_COUNTER_SYSCALL_READ,
+ GRPC_STATS_COUNTER_SYSCALL_POLL,
+ GRPC_STATS_COUNTER_SYSCALL_WAIT,
+ GRPC_STATS_COUNTER_COUNT
+} grpc_stats_counters;
+#define GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CLIENT_CALLS_CREATED)
+#define GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SERVER_CALLS_CREATED)
+#define GRPC_STATS_INC_SYSCALL_WRITE(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_WRITE)
+#define GRPC_STATS_INC_SYSCALL_READ(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_READ)
+#define GRPC_STATS_INC_SYSCALL_POLL(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_POLL)
+#define GRPC_STATS_INC_SYSCALL_WAIT(exec_ctx) \
+ GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_SYSCALL_WAIT)
+extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT];
+
+#endif /* GRPC_CORE_LIB_DEBUG_STATS_DATA_H */
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
new file mode 100644
index 0000000000..8afe48f5cd
--- /dev/null
+++ b/src/core/lib/debug/stats_data.yaml
@@ -0,0 +1,9 @@
+# Stats data declaration
+# use tools/codegen/core/gen_stats_data.py to turn this into stats_data.h
+
+- counter: client_calls_created
+- counter: server_calls_created
+- counter: syscall_write
+- counter: syscall_read
+- counter: syscall_poll
+- counter: syscall_wait
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 3ee646fa36..7f053fa728 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -39,6 +39,7 @@
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/lockfree_event.h"
@@ -48,7 +49,60 @@
#include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd;
-static int g_epfd;
+
+/*******************************************************************************
+ * Singleton epoll set related fields
+ */
+
+#define MAX_EPOLL_EVENTS 100
+#define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
+
+/* NOTE ON SYNCHRONIZATION:
+ * - Fields in this struct are only modified by the designated poller. Hence
+ * there is no need for any locks to protect the struct.
+ * - num_events and cursor fields have to be of atomic type to provide memory
+ * visibility guarantees only. i.e In case of multiple pollers, the designated
+ * polling thread keeps changing; the thread that wrote these values may be
+ * different from the thread reading the values
+ */
+typedef struct epoll_set {
+ int epfd;
+
+ /* The epoll_events after the last call to epoll_wait() */
+ struct epoll_event events[MAX_EPOLL_EVENTS];
+
+ /* The number of epoll_events after the last call to epoll_wait() */
+ gpr_atm num_events;
+
+ /* Index of the first event in epoll_events that has to be processed. This
+ * field is only valid if num_events > 0 */
+ gpr_atm cursor;
+} epoll_set;
+
+/* The global singleton epoll set */
+static epoll_set g_epoll_set;
+
+/* Must be called *only* once */
+static bool epoll_set_init() {
+ g_epoll_set.epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (g_epoll_set.epfd < 0) {
+ gpr_log(GPR_ERROR, "epoll unavailable");
+ return false;
+ }
+
+ gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
+ gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
+ gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
+ return true;
+}
+
+/* epoll_set_init() MUST be called before calling this. */
+static void epoll_set_shutdown() {
+ if (g_epoll_set.epfd >= 0) {
+ close(g_epoll_set.epfd);
+ g_epoll_set.epfd = -1;
+ }
+}
/*******************************************************************************
* Fd Declarations
@@ -122,7 +176,7 @@ struct grpc_pollset {
bool kicked_without_poller;
/* Set to true if the pollset is observed to have no workers available to
- * poll */
+ poll */
bool seen_inactive;
bool shutting_down; /* Is the pollset shutting down ? */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
@@ -228,7 +282,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET),
.data.ptr = new_fd};
- if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
+ if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
}
@@ -326,7 +380,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
GPR_TLS_DECL(g_current_thread_pollset);
GPR_TLS_DECL(g_current_thread_worker);
+
+/* The designated poller */
static gpr_atm g_active_poller;
+
static pollset_neighbourhood *g_neighbourhoods;
static size_t g_num_neighbourhoods;
@@ -380,7 +437,8 @@ static grpc_error *pollset_global_init(void) {
if (err != GRPC_ERROR_NONE) return err;
struct epoll_event ev = {.events = (uint32_t)(EPOLLIN | EPOLLET),
.data.ptr = &global_wakeup_fd};
- if (epoll_ctl(g_epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd, &ev) != 0) {
+ if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
+ &ev) != 0) {
return GRPC_OS_ERROR(errno, "epoll_ctl");
}
g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS);
@@ -497,8 +555,6 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
GPR_TIMER_END("pollset_shutdown", 0);
}
-#define MAX_EPOLL_EVENTS 100
-
static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
gpr_timespec now) {
gpr_timespec timeout;
@@ -517,56 +573,90 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
return millis >= 1 ? millis : 1;
}
-static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- gpr_timespec now, gpr_timespec deadline) {
- struct epoll_event events[MAX_EPOLL_EVENTS];
- static const char *err_desc = "pollset_poll";
-
- GPR_TIMER_BEGIN("pollset_epoll", 0);
-
- int timeout = poll_deadline_to_millis_timeout(deadline, now);
-
- if (timeout != 0) {
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- }
- int r;
- do {
- GPR_TIMER_BEGIN("epoll_wait", 0);
- r = epoll_wait(g_epfd, events, MAX_EPOLL_EVENTS, timeout);
- GPR_TIMER_END("epoll_wait", 0);
- } while (r < 0 && errno == EINTR);
- if (timeout != 0) {
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- }
+/* Process the epoll events found by do_epoll_wait() function.
+ - g_epoll_set.cursor points to the index of the first event to be processed
+ - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
+ updates the g_epoll_set.cursor
+
+ NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
+ called by g_active_poller thread. So there is no need for synchronization
+ when accessing fields in g_epoll_set */
+static grpc_error *process_epoll_events(grpc_exec_ctx *exec_ctx,
+ grpc_pollset *pollset) {
+ static const char *err_desc = "process_events";
+ grpc_error *error = GRPC_ERROR_NONE;
- if (r < 0) {
- GPR_TIMER_END("pollset_epoll", 0);
- return GRPC_OS_ERROR(errno, "epoll_wait");
- }
+ GPR_TIMER_BEGIN("process_epoll_events", 0);
+ long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
+ long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
+ for (int idx = 0;
+ (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
+ idx++) {
+ long c = cursor++;
+ struct epoll_event *ev = &g_epoll_set.events[c];
+ void *data_ptr = ev->data.ptr;
- grpc_error *error = GRPC_ERROR_NONE;
- for (int i = 0; i < r; i++) {
- void *data_ptr = events[i].data.ptr;
if (data_ptr == &global_wakeup_fd) {
append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
err_desc);
} else {
grpc_fd *fd = (grpc_fd *)(data_ptr);
- bool cancel = (events[i].events & (EPOLLERR | EPOLLHUP)) != 0;
- bool read_ev = (events[i].events & (EPOLLIN | EPOLLPRI)) != 0;
- bool write_ev = (events[i].events & EPOLLOUT) != 0;
+ bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0;
+ bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
+ bool write_ev = (ev->events & EPOLLOUT) != 0;
+
if (read_ev || cancel) {
fd_become_readable(exec_ctx, fd, pollset);
}
+
if (write_ev || cancel) {
fd_become_writable(exec_ctx, fd);
}
}
}
- GPR_TIMER_END("pollset_epoll", 0);
+ gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
+ GPR_TIMER_END("process_epoll_events", 0);
return error;
}
+/* Do epoll_wait and store the events in g_epoll_set.events field. This does not
+ "process" any of the events yet; that is done in process_epoll_events().
+ *See process_epoll_events() function for more details.
+
+ NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
+ (i.e the designated poller thread) will be calling this function. So there is
+ no need for any synchronization when accesing fields in g_epoll_set */
+static grpc_error *do_epoll_wait(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
+ gpr_timespec now, gpr_timespec deadline) {
+ GPR_TIMER_BEGIN("do_epoll_wait", 0);
+
+ int r;
+ int timeout = poll_deadline_to_millis_timeout(deadline, now);
+ if (timeout != 0) {
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ }
+ do {
+ GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
+ r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
+ timeout);
+ } while (r < 0 && errno == EINTR);
+ if (timeout != 0) {
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+ }
+
+ if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
+
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "ps: %p poll got %d events", ps, r);
+ }
+
+ gpr_atm_rel_store(&g_epoll_set.num_events, r);
+ gpr_atm_rel_store(&g_epoll_set.cursor, 0);
+
+ GPR_TIMER_END("do_epoll_wait", 0);
+ return GRPC_ERROR_NONE;
+}
+
static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl, gpr_timespec *now,
gpr_timespec deadline) {
@@ -827,32 +917,55 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
The function pollset_work() may temporarily release the lock (pollset->po.mu)
during the course of its execution but it will always re-acquire the lock and
ensure that it is held by the time the function returns */
-static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
grpc_pollset_worker worker;
grpc_error *error = GRPC_ERROR_NONE;
static const char *err_desc = "pollset_work";
GPR_TIMER_BEGIN("pollset_work", 0);
- if (pollset->kicked_without_poller) {
- pollset->kicked_without_poller = false;
+ if (ps->kicked_without_poller) {
+ ps->kicked_without_poller = false;
GPR_TIMER_END("pollset_work", 0);
return GRPC_ERROR_NONE;
}
- if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+
+ if (begin_worker(ps, &worker, worker_hdl, &now, deadline)) {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
- GPR_ASSERT(!pollset->shutting_down);
- GPR_ASSERT(!pollset->seen_inactive);
- gpr_mu_unlock(&pollset->mu);
- append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
- err_desc);
- gpr_mu_lock(&pollset->mu);
+ GPR_ASSERT(!ps->shutting_down);
+ GPR_ASSERT(!ps->seen_inactive);
+
+ gpr_mu_unlock(&ps->mu); /* unlock */
+ /* This is the designated polling thread at this point and should ideally do
+ polling. However, if there are unprocessed events left from a previous
+ call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
+ process the pending epoll events.
+
+ The reason for decoupling do_epoll_wait and process_epoll_events is to
+ better distrubute the work (i.e handling epoll events) across multiple
+ threads
+
+ process_epoll_events() returns very quickly: It just queues the work on
+ exec_ctx but does not execute it (the actual exectution or more
+ accurately grpc_exec_ctx_flush() happens in end_worker() AFTER selecting
+ a designated poller). So we are not waiting long periods without a
+ designated poller */
+ if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
+ gpr_atm_acq_load(&g_epoll_set.num_events)) {
+ append_error(&error, do_epoll_wait(exec_ctx, ps, now, deadline),
+ err_desc);
+ }
+ append_error(&error, process_epoll_events(exec_ctx, ps), err_desc);
+
+ gpr_mu_lock(&ps->mu); /* lock */
+
gpr_tls_set(&g_current_thread_worker, 0);
} else {
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
}
- end_worker(exec_ctx, pollset, &worker, worker_hdl);
+ end_worker(exec_ctx, ps, &worker, worker_hdl);
+
gpr_tls_set(&g_current_thread_pollset, 0);
GPR_TIMER_END("pollset_work", 0);
return error;
@@ -1043,7 +1156,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
- close(g_epfd);
+ epoll_set_shutdown();
}
static const grpc_event_engine_vtable vtable = {
@@ -1078,32 +1191,25 @@ static const grpc_event_engine_vtable vtable = {
};
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
- * Create a dummy epoll_fd to make sure epoll support is available */
+ * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
+ * support is available */
const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
- if (!explicit_request) {
- return NULL;
- }
-
if (!grpc_has_wakeup_fd()) {
return NULL;
}
- g_epfd = epoll_create1(EPOLL_CLOEXEC);
- if (g_epfd < 0) {
- gpr_log(GPR_ERROR, "epoll unavailable");
+ if (!epoll_set_init()) {
return NULL;
}
fd_global_init();
if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
- close(g_epfd);
fd_global_shutdown();
+ epoll_set_shutdown();
return NULL;
}
- gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
-
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index f2f3e15704..e2e3cd9003 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -40,6 +40,7 @@
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
@@ -1286,7 +1287,8 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
}
/* NOTE: This function may modify 'now' */
-static bool acquire_polling_lease(grpc_pollset_worker *worker,
+static bool acquire_polling_lease(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_worker *worker,
polling_island *pi, gpr_timespec deadline,
gpr_timespec *now) {
bool is_lease_acquired = false;
@@ -1306,6 +1308,7 @@ static bool acquire_polling_lease(grpc_pollset_worker *worker,
} else {
struct timespec sigwait_timeout = millis_to_timespec(timeout_ms);
GRPC_SCHEDULING_START_BLOCKING_REGION;
+ GRPC_STATS_INC_SYSCALL_WAIT(exec_ctx);
ret = sigtimedwait(&g_wakeup_sig_set, NULL, &sigwait_timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
}
@@ -1378,7 +1381,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
sigset_t *sig_mask, grpc_error **error) {
/* Only g_max_pollers_per_pi threads can be doing polling in parallel.
If we cannot get a lease, we cannot continue to do epoll_pwait() */
- if (!acquire_polling_lease(worker, pi, deadline, &now)) {
+ if (!acquire_polling_lease(exec_ctx, worker, pi, deadline, &now)) {
return;
}
@@ -1391,6 +1394,7 @@ static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
GRPC_SCHEDULING_START_BLOCKING_REGION;
+ GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
ep_rv =
epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
GRPC_SCHEDULING_END_BLOCKING_REGION;
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index 07c8eadf4f..ddb8c74d2d 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -41,6 +41,7 @@
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/lockfree_event.h"
@@ -776,6 +777,7 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_set *eps,
GRPC_SCHEDULING_START_BLOCKING_REGION;
acquire_epoll_lease(eps);
+ GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
release_epoll_lease(eps);
GRPC_SCHEDULING_END_BLOCKING_REGION;
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 770d1fd0a9..0e42f76af3 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -37,6 +37,7 @@
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/is_epollexclusive_available.h"
@@ -49,7 +50,7 @@
#include "src/core/lib/support/spinlock.h"
/*******************************************************************************
- * Pollset-set sibling link
+ * Polling object
*/
typedef enum {
@@ -814,6 +815,7 @@ static grpc_error *pollset_epoll(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
int r;
do {
+ GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
r = epoll_wait(p->epfd, pollset->events, MAX_EPOLL_EVENTS, timeout);
} while (r < 0 && errno == EINTR);
if (timeout != 0) {
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 070d75e42a..59c7cdc285 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -39,6 +39,7 @@
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/lockfree_event.h"
@@ -1236,6 +1237,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
g_current_thread_polling_island = pi;
GRPC_SCHEDULING_START_BLOCKING_REGION;
+ GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
ep_rv =
epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
GRPC_SCHEDULING_END_BLOCKING_REGION;
@@ -1728,9 +1730,7 @@ const grpc_event_engine_vtable *grpc_init_epollsig_linux(
}
if (!is_grpc_wakeup_signal_initialized) {
- /* TODO(ctiller): when other epoll engines are ready, remove the true || to
- * force this to be explitly chosen if needed */
- if (true || explicit_request) {
+ if (explicit_request) {
grpc_use_signal(SIGRTMIN + 6);
} else {
return NULL;
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 9472a8e520..fbd265f3ce 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -36,6 +36,7 @@
#include <grpc/support/tls.h>
#include <grpc/support/useful.h>
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_cv.h"
@@ -983,6 +984,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
/* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
even going into the blocking annotation if possible */
GRPC_SCHEDULING_START_BLOCKING_REGION;
+ GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
r = grpc_poll_function(pfds, pfd_count, timeout);
GRPC_SCHEDULING_END_BLOCKING_REGION;
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 91f8cd5482..424b40e425 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -64,8 +64,8 @@ typedef struct {
} event_engine_factory;
static const event_engine_factory g_factories[] = {
- {"epollsig", grpc_init_epollsig_linux},
{"epoll1", grpc_init_epoll1_linux},
+ {"epollsig", grpc_init_epollsig_linux},
{"epoll-threadpool", grpc_init_epoll_thread_pool_linux},
{"epoll-limited", grpc_init_epoll_limited_pollers_linux},
{"poll", grpc_init_poll_posix},
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index a0d2a965d5..c89792c8c4 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -19,6 +19,7 @@
#ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
#define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H
+#include <grpc/support/cpu.h>
#include "src/core/lib/iomgr/closure.h"
/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */
@@ -62,6 +63,7 @@ struct grpc_exec_ctx {
/** last active combiner in the active combiner list */
grpc_combiner *last_combiner;
uintptr_t flags;
+ unsigned starting_cpu;
void *check_ready_to_finish_arg;
bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
@@ -69,7 +71,10 @@ struct grpc_exec_ctx {
/* initializer for grpc_exec_ctx:
prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INITIALIZER(flags, finish_check, finish_check_arg) \
- { GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, finish_check_arg, finish_check }
+ { \
+ GRPC_CLOSURE_LIST_INIT, NULL, NULL, flags, gpr_cpu_current_cpu(), \
+ finish_check_arg, finish_check \
+ }
/* initialize an execution context at the top level of an API call into grpc
(this is safe to use elsewhere, though possibly not as efficient) */
diff --git a/src/core/lib/iomgr/iocp_windows.c b/src/core/lib/iomgr/iocp_windows.c
index e343f8a794..c082179c0b 100644
--- a/src/core/lib/iomgr/iocp_windows.c
+++ b/src/core/lib/iomgr/iocp_windows.c
@@ -27,6 +27,7 @@
#include <grpc/support/log_windows.h>
#include <grpc/support/thd.h>
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/socket_windows.h"
@@ -65,6 +66,7 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
LPOVERLAPPED overlapped;
grpc_winsocket *socket;
grpc_winsocket_callback_info *info;
+ GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
success = GetQueuedCompletionStatus(
g_iocp, &bytes, &completion_key, &overlapped,
deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type)));
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 2f543fd8a9..6c620ca245 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -40,6 +40,7 @@
#include <grpc/support/useful.h>
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/profiling/timers.h"
@@ -66,7 +67,6 @@ typedef struct {
grpc_fd *em_fd;
int fd;
bool finished_edge;
- msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
double target_length;
double bytes_read_this_round;
gpr_refcount refcount;
@@ -239,7 +239,6 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
size_t i;
GPR_ASSERT(!tcp->finished_edge);
- GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
GPR_TIMER_BEGIN("tcp_continue_read", 0);
@@ -251,13 +250,14 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
- msg.msg_iovlen = tcp->iov_size;
+ msg.msg_iovlen = (msg_iovlen_type)tcp->incoming_buffer->count;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
GPR_TIMER_BEGIN("recvmsg", 0);
do {
+ GRPC_STATS_INC_SYSCALL_READ(exec_ctx);
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
GPR_TIMER_END("recvmsg", read_bytes >= 0);
@@ -361,7 +361,8 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* returns true if done, false if pending; if returning true, *error is set */
#define MAX_WRITE_IOVEC 1000
-static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
+static bool tcp_flush(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
+ grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
@@ -403,6 +404,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
GPR_TIMER_BEGIN("sendmsg", 1);
do {
/* TODO(klempner): Cork if this is a partial write */
+ GRPC_STATS_INC_SYSCALL_WRITE(exec_ctx);
sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
} while (sent_length < 0 && errno == EINTR);
GPR_TIMER_END("sendmsg", 0);
@@ -459,7 +461,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
return;
}
- if (!tcp_flush(tcp, &error)) {
+ if (!tcp_flush(exec_ctx, tcp, &error)) {
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
gpr_log(GPR_DEBUG, "write: delayed");
}
@@ -510,7 +512,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->outgoing_slice_idx = 0;
tcp->outgoing_byte_idx = 0;
- if (!tcp_flush(tcp, &error)) {
+ if (!tcp_flush(exec_ctx, tcp, &error)) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
@@ -617,7 +619,6 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd,
tcp->min_read_chunk_size = tcp_min_read_chunk_size;
tcp->max_read_chunk_size = tcp_max_read_chunk_size;
tcp->bytes_read_this_round = 0;
- tcp->iov_size = 1;
tcp->finished_edge = true;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
diff --git a/src/core/lib/profiling/timers.h b/src/core/lib/profiling/timers.h
index 4d1437f606..7f02b4bf84 100644
--- a/src/core/lib/profiling/timers.h
+++ b/src/core/lib/profiling/timers.h
@@ -94,7 +94,7 @@ class ProfileScope {
public:
ProfileScope(const char *desc, bool important, const char *file, int line)
: desc_(desc) {
- gpr_timer_begin((desc_, important ? 1 : 0, file, line);
+ gpr_timer_begin(desc_, important ? 1 : 0, file, line);
}
~ProfileScope() { gpr_timer_end(desc_, 0, "n/a", 0); }
diff --git a/src/core/lib/support/block_annotate.h b/src/core/lib/support/block_annotate.h
index 0a2cb45018..8e3ef7df65 100644
--- a/src/core/lib/support/block_annotate.h
+++ b/src/core/lib/support/block_annotate.h
@@ -19,15 +19,37 @@
#ifndef GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H
#define GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void gpr_thd_start_blocking_region();
+void gpr_thd_end_blocking_region();
+
+#ifdef __cplusplus
+}
+#endif
+
/* These annotations identify the beginning and end of regions where
the code may block for reasons other than synchronization functions.
These include poll, epoll, and getaddrinfo. */
+#ifdef GRPC_SCHEDULING_MARK_BLOCKING_REGION
+#define GRPC_SCHEDULING_START_BLOCKING_REGION \
+ do { \
+ gpr_thd_start_blocking_region(); \
+ } while (0)
+#define GRPC_SCHEDULING_END_BLOCKING_REGION \
+ do { \
+ gpr_thd_end_blocking_region(); \
+ } while (0)
+#else
#define GRPC_SCHEDULING_START_BLOCKING_REGION \
do { \
} while (0)
#define GRPC_SCHEDULING_END_BLOCKING_REGION \
do { \
} while (0)
+#endif
#endif /* GRPC_CORE_LIB_SUPPORT_BLOCK_ANNOTATE_H */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index a0ac9ae7d5..e68c201134 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -32,6 +32,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/compression/algorithm_metadata.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -144,6 +145,9 @@ typedef struct {
grpc_call *sibling_prev;
} child_call;
+#define RECV_NONE ((gpr_atm)0)
+#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
+
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
@@ -170,9 +174,6 @@ struct grpc_call {
gpr_atm any_ops_sent_atm;
gpr_atm received_final_op_atm;
- /* have we received initial metadata */
- bool has_initial_md_been_received;
-
batch_control *active_batches[MAX_CONCURRENT_BATCHES];
grpc_transport_stream_op_batch_payload stream_op_payload;
@@ -230,7 +231,23 @@ struct grpc_call {
} server;
} final_op;
- void *saved_receiving_stream_ready_bctlp;
+ /* recv_state can contain one of the following values:
+ RECV_NONE : : no initial metadata and messages received
+ RECV_INITIAL_METADATA_FIRST : received initial metadata first
+ a batch_control* : received messages first
+
+ +------1------RECV_NONE------3-----+
+ | |
+ | |
+ v v
+ RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
+ | ^ | ^
+ | | | |
+ +-----2-----+ +-----4-----+
+
+ For 1, 4: See receiving_initial_metadata_ready() function
+ For 2, 3: See receiving_stream_ready() function */
+ gpr_atm recv_state;
};
grpc_tracer_flag grpc_call_error_trace =
@@ -318,6 +335,11 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
/* Always support no compression */
GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = args->server_transport_data == NULL;
+ if (call->is_client) {
+ GRPC_STATS_INC_CLIENT_CALLS_CREATED(exec_ctx);
+ } else {
+ GRPC_STATS_INC_SERVER_CALLS_CREATED(exec_ctx);
+ }
call->stream_op_payload.context = call->context;
grpc_slice path = grpc_empty_slice();
if (call->is_client) {
@@ -1400,11 +1422,12 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
}
- if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
- call->receiving_stream == NULL) {
+ /* If recv_state is RECV_NONE, we will save the batch_control
+ * object with rel_cas, and will not use it after the cas. Its corresponding
+ * acq_load is in receiving_initial_metadata_ready() */
+ if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL ||
+ !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
process_data_after_md(exec_ctx, bctlp);
- } else {
- call->saved_receiving_stream_ready_bctlp = bctlp;
}
}
@@ -1533,12 +1556,31 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
}
}
- call->has_initial_md_been_received = true;
- if (call->saved_receiving_stream_ready_bctlp != NULL) {
- grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE(
- receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
- grpc_schedule_on_exec_ctx);
- call->saved_receiving_stream_ready_bctlp = NULL;
+ grpc_closure *saved_rsr_closure = NULL;
+ while (true) {
+ gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
+ /* Should only receive initial metadata once */
+ GPR_ASSERT(rsr_bctlp != 1);
+ if (rsr_bctlp == 0) {
+ /* We haven't seen initial metadata and messages before, thus initial
+ * metadata is received first.
+ * no_barrier_cas is used, as this function won't access the batch_control
+ * object saved by receiving_stream_ready() if the initial metadata is
+ * received first. */
+ if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
+ RECV_INITIAL_METADATA_FIRST)) {
+ break;
+ }
+ } else {
+ /* Already received messages */
+ saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready,
+ (batch_control *)rsr_bctlp,
+ grpc_schedule_on_exec_ctx);
+ /* No need to modify recv_state */
+ break;
+ }
+ }
+ if (saved_rsr_closure != NULL) {
GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
}
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 185bfccb77..d537637cbb 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -19,6 +19,10 @@
#ifndef GRPC_CORE_LIB_SURFACE_CALL_H
#define GRPC_CORE_LIB_SURFACE_CALL_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/surface/api_trace.h"
@@ -26,10 +30,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
grpc_call *call, int success,
void *user_data);
@@ -89,7 +89,7 @@ grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx,
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
-void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
grpc_call *call, const grpc_op *ops, size_t nops,
void *tag);
diff --git a/src/core/lib/surface/call_log_batch.c b/src/core/lib/surface/call_log_batch.c
index 4443aba58a..4a1c265817 100644
--- a/src/core/lib/surface/call_log_batch.c
+++ b/src/core/lib/surface/call_log_batch.c
@@ -103,7 +103,7 @@ char *grpc_op_string(const grpc_op *op) {
return out;
}
-void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
grpc_call *call, const grpc_op *ops, size_t nops,
void *tag) {
char *tmp;
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index d199ac060e..898476daee 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -28,6 +28,7 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/connected_channel.h"
#include "src/core/lib/channel/handshaker_registry.h"
+#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/combiner.h"
@@ -118,6 +119,7 @@ void grpc_init(void) {
gpr_mu_lock(&g_init_mu);
if (++g_initializations == 1) {
gpr_time_init();
+ grpc_stats_init();
grpc_slice_intern_init();
grpc_mdctx_global_init();
grpc_channel_init_init();
@@ -186,6 +188,7 @@ void grpc_shutdown(void) {
grpc_mdctx_global_shutdown(&exec_ctx);
grpc_handshaker_factory_registry_shutdown(&exec_ctx);
grpc_slice_intern_shutdown();
+ grpc_stats_shutdown();
}
gpr_mu_unlock(&g_init_mu);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/transport/byte_stream.c b/src/core/lib/transport/byte_stream.c
index fb03a10315..08f61629a9 100644
--- a/src/core/lib/transport/byte_stream.c
+++ b/src/core/lib/transport/byte_stream.c
@@ -85,6 +85,7 @@ static void slice_buffer_stream_shutdown(grpc_exec_ctx *exec_ctx,
static void slice_buffer_stream_destroy(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *byte_stream) {
grpc_slice_buffer_stream *stream = (grpc_slice_buffer_stream *)byte_stream;
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, stream->backing_buffer);
GRPC_ERROR_UNREF(stream->shutdown_error);
}
diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h
index 1e1e8310b8..be2a35213e 100644
--- a/src/core/lib/transport/byte_stream.h
+++ b/src/core/lib/transport/byte_stream.h
@@ -81,7 +81,9 @@ void grpc_byte_stream_destroy(grpc_exec_ctx *exec_ctx,
// grpc_slice_buffer_stream
//
-// A grpc_byte_stream that wraps a slice buffer.
+// A grpc_byte_stream that wraps a slice buffer. The stream takes
+// ownership of the slices in the buffer, and on destruction will
+// reset the contents of the buffer.
typedef struct grpc_slice_buffer_stream {
grpc_byte_stream base;
diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c
index 8f24b8527c..a077052561 100644
--- a/src/core/lib/transport/metadata_batch.c
+++ b/src/core/lib/transport/metadata_batch.c
@@ -105,6 +105,7 @@ static grpc_error *maybe_link_callout(grpc_metadata_batch *batch,
return GRPC_ERROR_NONE;
}
if (batch->idx.array[idx] == NULL) {
+ if (grpc_static_callout_is_default[idx]) ++batch->list.default_count;
batch->idx.array[idx] = storage;
return GRPC_ERROR_NONE;
}
@@ -120,6 +121,7 @@ static void maybe_unlink_callout(grpc_metadata_batch *batch,
if (idx == GRPC_BATCH_CALLOUTS_COUNT) {
return;
}
+ if (grpc_static_callout_is_default[idx]) --batch->list.default_count;
GPR_ASSERT(batch->idx.array[idx] != NULL);
batch->idx.array[idx] = NULL;
}
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index 1b11a3e252..57d298c75c 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -41,6 +41,7 @@ typedef struct grpc_linked_mdelem {
typedef struct grpc_mdelem_list {
size_t count;
+ size_t default_count; // Number of default keys.
grpc_linked_mdelem *head;
grpc_linked_mdelem *tail;
} grpc_mdelem_list;
diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c
index 28f05d5c44..b20d94aeac 100644
--- a/src/core/lib/transport/static_metadata.c
+++ b/src/core/lib/transport/static_metadata.c
@@ -823,6 +823,31 @@ grpc_mdelem_data grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT] = {
{.refcount = &grpc_static_metadata_refcounts[97],
.data.refcounted = {g_bytes + 1040, 13}}},
};
+bool grpc_static_callout_is_default[GRPC_BATCH_CALLOUTS_COUNT] = {
+ true, // :path
+ true, // :method
+ true, // :status
+ true, // :authority
+ true, // :scheme
+ true, // te
+ true, // grpc-message
+ true, // grpc-status
+ true, // grpc-payload-bin
+ true, // grpc-encoding
+ true, // grpc-accept-encoding
+ true, // grpc-server-stats-bin
+ true, // grpc-tags-bin
+ true, // grpc-trace-bin
+ true, // content-type
+ true, // content-encoding
+ true, // accept-encoding
+ true, // grpc-internal-encoding-request
+ true, // grpc-internal-stream-encoding-request
+ true, // user-agent
+ true, // host
+ true, // lb-token
+};
+
const uint8_t grpc_static_accept_encoding_metadata[8] = {0, 76, 77, 78,
79, 80, 81, 82};
diff --git a/src/core/lib/transport/static_metadata.h b/src/core/lib/transport/static_metadata.h
index 93ab90dff8..f03a9d23b1 100644
--- a/src/core/lib/transport/static_metadata.h
+++ b/src/core/lib/transport/static_metadata.h
@@ -571,6 +571,8 @@ typedef union {
GRPC_BATCH_CALLOUTS_COUNT) \
: GRPC_BATCH_CALLOUTS_COUNT)
+extern bool grpc_static_callout_is_default[GRPC_BATCH_CALLOUTS_COUNT];
+
extern const uint8_t grpc_static_accept_encoding_metadata[8];
#define GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(algs) \
(GRPC_MAKE_MDELEM( \
diff --git a/src/core/tsi/test_creds/BUILD b/src/core/tsi/test_creds/BUILD
index 4b0786d7b8..732f6d91b2 100644
--- a/src/core/tsi/test_creds/BUILD
+++ b/src/core/tsi/test_creds/BUILD
@@ -15,7 +15,15 @@
licenses(["notice"]) # Apache v2
exports_files([
- "ca.pem",
- "server1.key",
- "server1.pem",
+ "ca.pem",
+ "server1.key",
+ "server1.pem",
+ "server0.key",
+ "server0.pem",
+ "client.key",
+ "client.pem",
+ "badserver.key",
+ "badserver.pem",
+ "badclient.key",
+ "badclient.pem",
])
diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc
index f870af0c67..448d9fbcf2 100644
--- a/src/cpp/common/channel_filter.cc
+++ b/src/cpp/common/channel_filter.cc
@@ -18,7 +18,9 @@
#include <string.h>
+extern "C" {
#include "src/core/lib/channel/channel_stack.h"
+}
#include "src/cpp/common/channel_filter.h"
#include <grpc++/impl/codegen/slice.h>
diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h
index 5d629f7c14..c3d187d7e1 100644
--- a/src/cpp/common/channel_filter.h
+++ b/src/cpp/common/channel_filter.h
@@ -26,9 +26,11 @@
#include <functional>
#include <vector>
+extern "C" {
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/surface/channel_init.h"
#include "src/core/lib/transport/metadata_batch.h"
+}
/// An interface to define filters.
///
@@ -193,6 +195,15 @@ class TransportStreamOpBatch {
op_->payload->send_message.send_message = send_message;
}
+ grpc_byte_stream **recv_message() const {
+ return op_->recv_message ? op_->payload->recv_message.recv_message
+ : nullptr;
+ }
+ void set_recv_message(grpc_byte_stream **recv_message) {
+ op_->recv_message = true;
+ op_->payload->recv_message.recv_message = recv_message;
+ }
+
census_context *get_census_context() const {
return (census_context *)op_->payload->context[GRPC_CONTEXT_TRACING].value;
}
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index c7c6b6b13b..6ea5f1d3c7 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -89,6 +89,10 @@ int CoreCodegen::gpr_cv_wait(gpr_cv* cv, gpr_mu* mu,
void CoreCodegen::gpr_cv_signal(gpr_cv* cv) { ::gpr_cv_signal(cv); }
void CoreCodegen::gpr_cv_broadcast(gpr_cv* cv) { ::gpr_cv_broadcast(cv); }
+grpc_byte_buffer* CoreCodegen::grpc_byte_buffer_copy(grpc_byte_buffer* bb) {
+ return ::grpc_byte_buffer_copy(bb);
+}
+
void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 2483300cb1..6bd3ecda32 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -17,6 +17,7 @@
#include <grpc++/server.h>
+#include <cstdlib>
#include <sstream>
#include <utility>
@@ -38,6 +39,7 @@
#include "src/core/ext/transport/inproc/inproc_transport.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/surface/call.h"
#include "src/cpp/client/create_channel_internal.h"
#include "src/cpp/server/health/default_health_check_service.h"
#include "src/cpp/thread_manager/thread_manager.h"
@@ -607,7 +609,12 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
grpc_op cops[MAX_OPS];
ops->FillOps(call->call(), cops, &nops);
auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
- GPR_ASSERT(GRPC_CALL_OK == result);
+ if (result != GRPC_CALL_OK) {
+ gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);
+ grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR,
+ call->call(), cops, nops, ops);
+ abort();
+ }
}
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
index 3a282b0526..7d073c9a84 100644
--- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
+++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec
@@ -101,7 +101,7 @@ Pod::Spec.new do |s|
s.preserve_paths = plugin
# Restrict the protoc version to the one supported by this plugin.
- s.dependency '!ProtoCompiler', '3.3.0'
+ s.dependency '!ProtoCompiler', '3.4.0'
# For the Protobuf dependency not to complain:
s.ios.deployment_target = '7.0'
s.osx.deployment_target = '10.9'
diff --git a/src/objective-c/!ProtoCompiler.podspec b/src/objective-c/!ProtoCompiler.podspec
index c3f95f9f42..25c437911f 100644
--- a/src/objective-c/!ProtoCompiler.podspec
+++ b/src/objective-c/!ProtoCompiler.podspec
@@ -36,7 +36,7 @@ Pod::Spec.new do |s|
# exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed
# before them.
s.name = '!ProtoCompiler'
- v = '3.3.0'
+ v = '3.4.0'
s.version = v
s.summary = 'The Protobuf Compiler (protoc) generates Objective-C files from .proto files'
s.description = <<-DESC
diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c
index 4ed56de993..0f2c5b8114 100644
--- a/src/php/ext/grpc/php_grpc.c
+++ b/src/php/ext/grpc/php_grpc.c
@@ -49,9 +49,7 @@ const zend_function_entry grpc_functions[] = {
/* {{{ grpc_module_entry
*/
zend_module_entry grpc_module_entry = {
-#if ZEND_MODULE_API_NO >= 20010901
STANDARD_MODULE_HEADER,
-#endif
"grpc",
grpc_functions,
PHP_MINIT(grpc),
@@ -59,9 +57,7 @@ zend_module_entry grpc_module_entry = {
PHP_RINIT(grpc),
NULL,
PHP_MINFO(grpc),
-#if ZEND_MODULE_API_NO >= 20010901
PHP_GRPC_VERSION,
-#endif
PHP_MODULE_GLOBALS(grpc),
PHP_GINIT(grpc),
NULL,
diff --git a/src/proto/grpc/health/v1/BUILD b/src/proto/grpc/health/v1/BUILD
index 6f6779594b..d234842883 100644
--- a/src/proto/grpc/health/v1/BUILD
+++ b/src/proto/grpc/health/v1/BUILD
@@ -14,15 +14,9 @@
licenses(["notice"]) # Apache v2
-package(
- default_visibility = ["//visibility:public"],
- features = [
- "-layering_check",
- "-parse_headers",
- ],
-)
+load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
-load("//bazel:grpc_build_system.bzl", "grpc_proto_library")
+grpc_package(name = "health", visibility = "public")
grpc_proto_library(
name = "health_proto",
diff --git a/src/proto/grpc/lb/v1/BUILD b/src/proto/grpc/lb/v1/BUILD
index 61b28ee3bb..15bf3c3233 100644
--- a/src/proto/grpc/lb/v1/BUILD
+++ b/src/proto/grpc/lb/v1/BUILD
@@ -14,15 +14,9 @@
licenses(["notice"]) # Apache v2
-package(
- default_visibility = ["//visibility:public"],
- features = [
- "-layering_check",
- "-parse_headers",
- ],
-)
+load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
-load("//bazel:grpc_build_system.bzl", "grpc_proto_library")
+grpc_package(name = "lb", visibility = "public")
grpc_proto_library(
name = "load_balancer_proto",
diff --git a/src/proto/grpc/reflection/v1alpha/BUILD b/src/proto/grpc/reflection/v1alpha/BUILD
index b60784e9d9..4605418447 100644
--- a/src/proto/grpc/reflection/v1alpha/BUILD
+++ b/src/proto/grpc/reflection/v1alpha/BUILD
@@ -14,15 +14,9 @@
licenses(["notice"]) # Apache v2
-package(
- default_visibility = ["//visibility:public"],
- features = [
- "-layering_check",
- "-parse_headers",
- ],
-)
+load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
-load("//bazel:grpc_build_system.bzl", "grpc_proto_library")
+grpc_package(name = "reflection", visibility = "public")
grpc_proto_library(
name = "reflection_proto",
diff --git a/src/proto/grpc/status/BUILD b/src/proto/grpc/status/BUILD
index 61688e5f96..14315d36e3 100644
--- a/src/proto/grpc/status/BUILD
+++ b/src/proto/grpc/status/BUILD
@@ -14,15 +14,9 @@
licenses(["notice"]) # Apache v2
-package(
- default_visibility = ["//visibility:public"],
- features = [
- "-layering_check",
- "-parse_headers",
- ],
-)
+load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
-load("//bazel:grpc_build_system.bzl", "grpc_proto_library")
+grpc_package(name = "status", visibility = "public")
grpc_proto_library(
name = "status_proto",
diff --git a/src/proto/grpc/testing/BUILD b/src/proto/grpc/testing/BUILD
index c8e7d03cb0..07e08117f0 100644
--- a/src/proto/grpc/testing/BUILD
+++ b/src/proto/grpc/testing/BUILD
@@ -14,15 +14,9 @@
licenses(["notice"]) # Apache v2
-package(
- default_visibility = ["//visibility:public"],
- features = [
- "-layering_check",
- "-parse_headers",
- ],
-)
+load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
-load("//bazel:grpc_build_system.bzl", "grpc_proto_library")
+grpc_package(name = "testing", visibility = "public")
grpc_proto_library(
name = "compiler_test_proto",
diff --git a/src/proto/grpc/testing/duplicate/BUILD b/src/proto/grpc/testing/duplicate/BUILD
index 8f91710999..714c9a7518 100644
--- a/src/proto/grpc/testing/duplicate/BUILD
+++ b/src/proto/grpc/testing/duplicate/BUILD
@@ -14,15 +14,9 @@
licenses(["notice"]) # Apache v2
-package(
- default_visibility = ["//visibility:public"],
- features = [
- "-layering_check",
- "-parse_headers",
- ],
-)
+load("//bazel:grpc_build_system.bzl", "grpc_proto_library", "grpc_package")
-load("//bazel:grpc_build_system.bzl", "grpc_proto_library")
+grpc_package(name = "duplicate", visibility = "public")
grpc_proto_library(
name = "echo_duplicate_proto",
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index dc4d28f95b..1cbf345ab6 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -72,6 +72,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/compression/compression.c',
'src/core/lib/compression/message_compress.c',
'src/core/lib/compression/stream_compression.c',
+ 'src/core/lib/debug/stats.c',
+ 'src/core/lib/debug/stats_data.c',
'src/core/lib/http/format_request.c',
'src/core/lib/http/httpcli.c',
'src/core/lib/http/parser.c',
diff --git a/src/python/grpcio_testing/grpc_testing/__init__.py b/src/python/grpcio_testing/grpc_testing/__init__.py
index 14e25f09e2..917e11808e 100644
--- a/src/python/grpcio_testing/grpc_testing/__init__.py
+++ b/src/python/grpcio_testing/grpc_testing/__init__.py
@@ -293,6 +293,278 @@ class Channel(six.with_metaclass(abc.ABCMeta), grpc.Channel):
raise NotImplementedError()
+class UnaryUnaryServerRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a unary-unary RPC serviced by a system under test.
+
+ Enables users to "play client" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata emitted by the system under test.
+
+ This method blocks until the system under test has added initial
+ metadata to the RPC (or has provided one or more response messages or
+ has terminated the RPC, either of which will cause gRPC Python to
+ synthesize initial metadata for the RPC).
+
+ Returns:
+ The initial metadata for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def termination(self):
+ """Blocks until the system under test has terminated the RPC.
+
+ Returns:
+ A (response, trailing_metadata, code, details) sequence with the RPC's
+ response, trailing metadata, code, and details.
+ """
+ raise NotImplementedError()
+
+
+class UnaryStreamServerRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a unary-stream RPC serviced by a system under test.
+
+ Enables users to "play client" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata emitted by the system under test.
+
+ This method blocks until the system under test has added initial
+ metadata to the RPC (or has provided one or more response messages or
+ has terminated the RPC, either of which will cause gRPC Python to
+ synthesize initial metadata for the RPC).
+
+ Returns:
+ The initial metadata for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_response(self):
+ """Draws one of the responses added to the RPC by the system under test.
+
+ Successive calls to this method return responses in the same order in
+ which the system under test added them to the RPC.
+
+ Returns:
+ A response message added to the RPC by the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def termination(self):
+ """Blocks until the system under test has terminated the RPC.
+
+ Returns:
+ A (trailing_metadata, code, details) sequence with the RPC's trailing
+ metadata, code, and details.
+ """
+ raise NotImplementedError()
+
+
+class StreamUnaryServerRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a stream-unary RPC serviced by a system under test.
+
+ Enables users to "play client" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata emitted by the system under test.
+
+ This method blocks until the system under test has added initial
+ metadata to the RPC (or has provided one or more response messages or
+ has terminated the RPC, either of which will cause gRPC Python to
+ synthesize initial metadata for the RPC).
+
+ Returns:
+ The initial metadata for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_request(self, request):
+ """Sends a request to the system under test.
+
+ Args:
+ request: A request message for the RPC to be "sent" to the system
+ under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ """Indicates the end of the RPC's request stream."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def termination(self):
+ """Blocks until the system under test has terminated the RPC.
+
+ Returns:
+ A (response, trailing_metadata, code, details) sequence with the RPC's
+ response, trailing metadata, code, and details.
+ """
+ raise NotImplementedError()
+
+
+class StreamStreamServerRpc(six.with_metaclass(abc.ABCMeta)):
+ """Fixture for a stream-stream RPC serviced by a system under test.
+
+ Enables users to "play client" for the RPC.
+ """
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ """Accesses the initial metadata emitted by the system under test.
+
+ This method blocks until the system under test has added initial
+ metadata to the RPC (or has provided one or more response messages or
+ has terminated the RPC, either of which will cause gRPC Python to
+ synthesize initial metadata for the RPC).
+
+ Returns:
+ The initial metadata for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_request(self, request):
+ """Sends a request to the system under test.
+
+ Args:
+ request: A request message for the RPC to be "sent" to the system
+ under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ """Indicates the end of the RPC's request stream."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_response(self):
+ """Draws one of the responses added to the RPC by the system under test.
+
+ Successive calls to this method return responses in the same order in
+ which the system under test added them to the RPC.
+
+ Returns:
+ A response message added to the RPC by the system under test.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ """Cancels the RPC."""
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def termination(self):
+ """Blocks until the system under test has terminated the RPC.
+
+ Returns:
+ A (trailing_metadata, code, details) sequence with the RPC's trailing
+ metadata, code, and details.
+ """
+ raise NotImplementedError()
+
+
+class Server(six.with_metaclass(abc.ABCMeta)):
+ """A server with which to test a system that services RPCs."""
+
+ @abc.abstractmethod
+ def invoke_unary_unary(
+ self, method_descriptor, invocation_metadata, request, timeout):
+ """Invokes an RPC to be serviced by the system under test.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a unary-unary
+ RPC method.
+ invocation_metadata: The RPC's invocation metadata.
+ request: The RPC's request.
+ timeout: A duration of time in seconds for the RPC or None to
+ indicate that the RPC has no time limit.
+
+ Returns:
+ A UnaryUnaryServerRpc with which to "play client" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_unary_stream(
+ self, method_descriptor, invocation_metadata, request, timeout):
+ """Invokes an RPC to be serviced by the system under test.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a unary-stream
+ RPC method.
+ invocation_metadata: The RPC's invocation metadata.
+ request: The RPC's request.
+ timeout: A duration of time in seconds for the RPC or None to
+ indicate that the RPC has no time limit.
+
+ Returns:
+ A UnaryStreamServerRpc with which to "play client" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_stream_unary(
+ self, method_descriptor, invocation_metadata, timeout):
+ """Invokes an RPC to be serviced by the system under test.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a stream-unary
+ RPC method.
+ invocation_metadata: The RPC's invocation metadata.
+ timeout: A duration of time in seconds for the RPC or None to
+ indicate that the RPC has no time limit.
+
+ Returns:
+ A StreamUnaryServerRpc with which to "play client" for the RPC.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_stream_stream(
+ self, method_descriptor, invocation_metadata, timeout):
+ """Invokes an RPC to be serviced by the system under test.
+
+ Args:
+ method_descriptor: A descriptor.MethodDescriptor describing a stream-stream
+ RPC method.
+ invocation_metadata: The RPC's invocation metadata.
+ timeout: A duration of time in seconds for the RPC or None to
+ indicate that the RPC has no time limit.
+
+ Returns:
+ A StreamStreamServerRpc with which to "play client" for the RPC.
+ """
+ raise NotImplementedError()
+
+
class Time(six.with_metaclass(abc.ABCMeta)):
"""A simulation of time.
@@ -406,3 +678,20 @@ def channel(service_descriptors, time):
"""
from grpc_testing import _channel
return _channel.testing_channel(service_descriptors, time)
+
+
+def server_from_dictionary(descriptors_to_servicers, time):
+ """Creates a Server for use in tests of a gRPC Python-using system.
+
+ Args:
+ descriptors_to_servicers: A dictionary from descriptor.ServiceDescriptors
+ defining RPC services to servicer objects (usually instances of classes
+ that implement "Servicer" interfaces defined in generated "_pb2_grpc"
+ modules) implementing those services.
+ time: A Time to be used for tests.
+
+ Returns:
+ A Server for use in tests.
+ """
+ from grpc_testing import _server
+ return _server.server_from_dictionary(descriptors_to_servicers, time)
diff --git a/src/python/grpcio_testing/grpc_testing/_common.py b/src/python/grpcio_testing/grpc_testing/_common.py
index cb4a7f5fa2..1517434ca7 100644
--- a/src/python/grpcio_testing/grpc_testing/_common.py
+++ b/src/python/grpcio_testing/grpc_testing/_common.py
@@ -37,6 +37,16 @@ def fuss_with_metadata(metadata):
return _fuss(tuple(metadata))
+def rpc_names(service_descriptors):
+ rpc_names_to_descriptors = {}
+ for service_descriptor in service_descriptors:
+ for method_descriptor in service_descriptor.methods_by_name.values():
+ rpc_name = '/{}/{}'.format(
+ service_descriptor.full_name, method_descriptor.name)
+ rpc_names_to_descriptors[rpc_name] = method_descriptor
+ return rpc_names_to_descriptors
+
+
class ChannelRpcRead(
collections.namedtuple(
'ChannelRpcRead',
@@ -90,3 +100,61 @@ class ChannelHandler(six.with_metaclass(abc.ABCMeta)):
self, method_full_rpc_name, invocation_metadata, requests,
requests_closed, timeout):
raise NotImplementedError()
+
+
+class ServerRpcRead(
+ collections.namedtuple('ServerRpcRead',
+ ('request', 'requests_closed', 'terminated',))):
+ pass
+
+
+REQUESTS_CLOSED = ServerRpcRead(None, True, False)
+TERMINATED = ServerRpcRead(None, False, True)
+
+
+class ServerRpcHandler(six.with_metaclass(abc.ABCMeta)):
+
+ @abc.abstractmethod
+ def send_initial_metadata(self, initial_metadata):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_request(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_response(self, response):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def send_termination(self, trailing_metadata, code, details):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_termination_callback(self, callback):
+ raise NotImplementedError()
+
+
+class Serverish(six.with_metaclass(abc.ABCMeta)):
+
+ @abc.abstractmethod
+ def invoke_unary_unary(
+ self, method_descriptor, handler, invocation_metadata, request,
+ deadline):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_unary_stream(
+ self, method_descriptor, handler, invocation_metadata, request,
+ deadline):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_stream_unary(
+ self, method_descriptor, handler, invocation_metadata, deadline):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def invoke_stream_stream(
+ self, method_descriptor, handler, invocation_metadata, deadline):
+ raise NotImplementedError()
diff --git a/src/python/grpcio_testing/grpc_testing/_server/__init__.py b/src/python/grpcio_testing/grpc_testing/_server/__init__.py
new file mode 100644
index 0000000000..759512949a
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/__init__.py
@@ -0,0 +1,20 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from grpc_testing._server import _server
+
+
+def server_from_dictionary(descriptors_to_servicers, time):
+ return _server.server_from_descriptor_to_servicers(
+ descriptors_to_servicers, time)
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_handler.py b/src/python/grpcio_testing/grpc_testing/_server/_handler.py
new file mode 100644
index 0000000000..b47e04c718
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_handler.py
@@ -0,0 +1,215 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+import threading
+
+import grpc
+from grpc_testing import _common
+
+_CLIENT_INACTIVE = object()
+
+
+class Handler(_common.ServerRpcHandler):
+
+ @abc.abstractmethod
+ def initial_metadata(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def add_request(self, request):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def take_response(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def requests_closed(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def cancel(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def unary_response_termination(self):
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def stream_response_termination(self):
+ raise NotImplementedError()
+
+
+class _Handler(Handler):
+
+ def __init__(self, requests_closed):
+ self._condition = threading.Condition()
+ self._requests = []
+ self._requests_closed = requests_closed
+ self._initial_metadata = None
+ self._responses = []
+ self._trailing_metadata = None
+ self._code = None
+ self._details = None
+ self._unary_response = None
+ self._expiration_future = None
+ self._termination_callbacks = []
+
+ def send_initial_metadata(self, initial_metadata):
+ with self._condition:
+ self._initial_metadata = initial_metadata
+ self._condition.notify_all()
+
+ def take_request(self):
+ with self._condition:
+ while True:
+ if self._code is None:
+ if self._requests:
+ request = self._requests.pop(0)
+ self._condition.notify_all()
+ return _common.ServerRpcRead(request, False, False)
+ elif self._requests_closed:
+ return _common.REQUESTS_CLOSED
+ else:
+ self._condition.wait()
+ else:
+ return _common.TERMINATED
+
+ def is_active(self):
+ with self._condition:
+ return self._code is None
+
+ def add_response(self, response):
+ with self._condition:
+ self._responses.append(response)
+ self._condition.notify_all()
+
+ def send_termination(self, trailing_metadata, code, details):
+ with self._condition:
+ self._trailing_metadata = trailing_metadata
+ self._code = code
+ self._details = details
+ if self._expiration_future is not None:
+ self._expiration_future.cancel()
+ self._condition.notify_all()
+
+ def add_termination_callback(self, termination_callback):
+ with self._condition:
+ if self._code is None:
+ self._termination_callbacks.append(termination_callback)
+ return True
+ else:
+ return False
+
+ def initial_metadata(self):
+ with self._condition:
+ while True:
+ if self._initial_metadata is None:
+ if self._code is None:
+ self._condition.wait()
+ else:
+ raise ValueError(
+ 'No initial metadata despite status code!')
+ else:
+ return self._initial_metadata
+
+ def add_request(self, request):
+ with self._condition:
+ self._requests.append(request)
+ self._condition.notify_all()
+
+ def take_response(self):
+ with self._condition:
+ while True:
+ if self._responses:
+ response = self._responses.pop(0)
+ self._condition.notify_all()
+ return response
+ elif self._code is None:
+ self._condition.wait()
+ else:
+ raise ValueError('No more responses!')
+
+ def requests_closed(self):
+ with self._condition:
+ self._requests_closed = True
+ self._condition.notify_all()
+
+ def cancel(self):
+ with self._condition:
+ if self._code is None:
+ self._code = _CLIENT_INACTIVE
+ termination_callbacks = self._termination_callbacks
+ self._termination_callbacks = None
+ if self._expiration_future is not None:
+ self._expiration_future.cancel()
+ self._condition.notify_all()
+ for termination_callback in termination_callbacks:
+ termination_callback()
+
+ def unary_response_termination(self):
+ with self._condition:
+ while True:
+ if self._code is _CLIENT_INACTIVE:
+ raise ValueError('Huh? Cancelled but wanting status?')
+ elif self._code is None:
+ self._condition.wait()
+ else:
+ if self._unary_response is None:
+ if self._responses:
+ self._unary_response = self._responses.pop(0)
+ return (
+ self._unary_response, self._trailing_metadata,
+ self._code, self._details,)
+
+
+ def stream_response_termination(self):
+ with self._condition:
+ while True:
+ if self._code is _CLIENT_INACTIVE:
+ raise ValueError('Huh? Cancelled but wanting status?')
+ elif self._code is None:
+ self._condition.wait()
+ else:
+ return self._trailing_metadata, self._code, self._details,
+
+ def expire(self):
+ with self._condition:
+ if self._code is None:
+ if self._initial_metadata is None:
+ self._initial_metadata = _common.FUSSED_EMPTY_METADATA
+ self._trailing_metadata = _common.FUSSED_EMPTY_METADATA
+ self._code = grpc.StatusCode.DEADLINE_EXCEEDED
+ self._details = 'Took too much time!'
+ termination_callbacks = self._termination_callbacks
+ self._termination_callbacks = None
+ self._condition.notify_all()
+ for termination_callback in termination_callbacks:
+ termination_callback()
+
+ def set_expiration_future(self, expiration_future):
+ with self._condition:
+ self._expiration_future = expiration_future
+
+
+def handler_without_deadline(requests_closed):
+ return _Handler(requests_closed)
+
+
+def handler_with_deadline(requests_closed, time, deadline):
+ handler = _Handler(requests_closed)
+ expiration_future = time.call_at(handler.expire, deadline)
+ handler.set_expiration_future(expiration_future)
+ return handler
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_rpc.py b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py
new file mode 100644
index 0000000000..f81876f4b2
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_rpc.py
@@ -0,0 +1,153 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import threading
+
+import grpc
+from grpc_testing import _common
+
+
+class Rpc(object):
+
+ def __init__(self, handler, invocation_metadata):
+ self._condition = threading.Condition()
+ self._handler = handler
+ self._invocation_metadata = invocation_metadata
+ self._initial_metadata_sent = False
+ self._pending_trailing_metadata = None
+ self._pending_code = None
+ self._pending_details = None
+ self._callbacks = []
+ self._active = True
+ self._rpc_errors = []
+
+ def _ensure_initial_metadata_sent(self):
+ if not self._initial_metadata_sent:
+ self._handler.send_initial_metadata(_common.FUSSED_EMPTY_METADATA)
+ self._initial_metadata_sent = True
+
+ def _call_back(self):
+ callbacks = tuple(self._callbacks)
+ self._callbacks = None
+
+ def call_back():
+ for callback in callbacks:
+ try:
+ callback()
+ except Exception: # pylint: disable=broad-except
+ logging.exception('Exception calling server-side callback!')
+
+ callback_calling_thread = threading.Thread(target=call_back)
+ callback_calling_thread.start()
+
+ def _terminate(self, trailing_metadata, code, details):
+ if self._active:
+ self._active = False
+ self._handler.send_termination(trailing_metadata, code, details)
+ self._call_back()
+ self._condition.notify_all()
+
+ def _complete(self):
+ if self._pending_trailing_metadata is None:
+ trailing_metadata = _common.FUSSED_EMPTY_METADATA
+ else:
+ trailing_metadata = self._pending_trailing_metadata
+ if self._pending_code is None:
+ code = grpc.StatusCode.OK
+ else:
+ code = self._pending_code
+ details = '' if self._pending_details is None else self._pending_details
+ self._terminate(trailing_metadata, code, details)
+
+ def _abort(self, code, details):
+ self._terminate(_common.FUSSED_EMPTY_METADATA, code, details)
+
+ def add_rpc_error(self, rpc_error):
+ with self._condition:
+ self._rpc_errors.append(rpc_error)
+
+ def application_cancel(self):
+ with self._condition:
+ self._abort(
+ grpc.StatusCode.CANCELLED,
+ 'Cancelled by server-side application!')
+
+ def application_exception_abort(self, exception):
+ with self._condition:
+ if exception not in self._rpc_errors:
+ logging.exception('Exception calling application!')
+ self._abort(
+ grpc.StatusCode.UNKNOWN,
+ 'Exception calling application: {}'.format(exception))
+
+ def extrinsic_abort(self):
+ with self._condition:
+ if self._active:
+ self._active = False
+ self._call_back()
+ self._condition.notify_all()
+
+ def unary_response_complete(self, response):
+ with self._condition:
+ self._ensure_initial_metadata_sent()
+ self._handler.add_response(response)
+ self._complete()
+
+ def stream_response(self, response):
+ with self._condition:
+ self._ensure_initial_metadata_sent()
+ self._handler.add_response(response)
+
+ def stream_response_complete(self):
+ with self._condition:
+ self._ensure_initial_metadata_sent()
+ self._complete()
+
+ def send_initial_metadata(self, initial_metadata):
+ with self._condition:
+ if self._initial_metadata_sent:
+ return False
+ else:
+ self._handler.send_initial_metadata(initial_metadata)
+ self._initial_metadata_sent = True
+ return True
+
+ def is_active(self):
+ with self._condition:
+ return self._active
+
+ def add_callback(self, callback):
+ with self._condition:
+ if self._callbacks is None:
+ return False
+ else:
+ self._callbacks.append(callback)
+ return True
+
+ def invocation_metadata(self):
+ with self._condition:
+ return self._invocation_metadata
+
+ def set_trailing_metadata(self, trailing_metadata):
+ with self._condition:
+ self._pending_trailing_metadata = trailing_metadata
+
+ def set_code(self, code):
+ with self._condition:
+ self._pending_code = code
+
+ def set_details(self, details):
+ with self._condition:
+ self._pending_details = details
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_server.py b/src/python/grpcio_testing/grpc_testing/_server/_server.py
new file mode 100644
index 0000000000..66bcfc13c0
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_server.py
@@ -0,0 +1,149 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+
+import grpc_testing
+from grpc_testing import _common
+from grpc_testing._server import _handler
+from grpc_testing._server import _rpc
+from grpc_testing._server import _server_rpc
+from grpc_testing._server import _service
+from grpc_testing._server import _servicer_context
+
+
+def _implementation(descriptors_to_servicers, method_descriptor):
+ servicer = descriptors_to_servicers[method_descriptor.containing_service]
+ return getattr(servicer, method_descriptor.name)
+
+
+def _unary_unary_service(request):
+ def service(implementation, rpc, servicer_context):
+ _service.unary_unary(
+ implementation, rpc, request, servicer_context)
+ return service
+
+
+def _unary_stream_service(request):
+ def service(implementation, rpc, servicer_context):
+ _service.unary_stream(
+ implementation, rpc, request, servicer_context)
+ return service
+
+
+def _stream_unary_service(handler):
+ def service(implementation, rpc, servicer_context):
+ _service.stream_unary(implementation, rpc, handler, servicer_context)
+ return service
+
+
+def _stream_stream_service(handler):
+ def service(implementation, rpc, servicer_context):
+ _service.stream_stream(implementation, rpc, handler, servicer_context)
+ return service
+
+
+class _Serverish(_common.Serverish):
+
+ def __init__(self, descriptors_to_servicers, time):
+ self._descriptors_to_servicers = descriptors_to_servicers
+ self._time = time
+
+ def _invoke(
+ self, service_behavior, method_descriptor, handler,
+ invocation_metadata, deadline):
+ implementation = _implementation(
+ self._descriptors_to_servicers, method_descriptor)
+ rpc = _rpc.Rpc(handler, invocation_metadata)
+ if handler.add_termination_callback(rpc.extrinsic_abort):
+ servicer_context = _servicer_context.ServicerContext(
+ rpc, self._time, deadline)
+ service_thread = threading.Thread(
+ target=service_behavior,
+ args=(implementation, rpc, servicer_context,))
+ service_thread.start()
+
+ def invoke_unary_unary(
+ self, method_descriptor, handler, invocation_metadata, request,
+ deadline):
+ self._invoke(
+ _unary_unary_service(request), method_descriptor, handler,
+ invocation_metadata, deadline)
+
+ def invoke_unary_stream(
+ self, method_descriptor, handler, invocation_metadata, request,
+ deadline):
+ self._invoke(
+ _unary_stream_service(request), method_descriptor, handler,
+ invocation_metadata, deadline)
+
+ def invoke_stream_unary(
+ self, method_descriptor, handler, invocation_metadata, deadline):
+ self._invoke(
+ _stream_unary_service(handler), method_descriptor, handler,
+ invocation_metadata, deadline)
+
+ def invoke_stream_stream(
+ self, method_descriptor, handler, invocation_metadata, deadline):
+ self._invoke(
+ _stream_stream_service(handler), method_descriptor, handler,
+ invocation_metadata, deadline)
+
+
+def _deadline_and_handler(requests_closed, time, timeout):
+ if timeout is None:
+ return None, _handler.handler_without_deadline(requests_closed)
+ else:
+ deadline = time.time() + timeout
+ handler = _handler.handler_with_deadline(requests_closed, time, deadline)
+ return deadline, handler
+
+
+class _Server(grpc_testing.Server):
+
+ def __init__(self, serverish, time):
+ self._serverish = serverish
+ self._time = time
+
+ def invoke_unary_unary(
+ self, method_descriptor, invocation_metadata, request, timeout):
+ deadline, handler = _deadline_and_handler(True, self._time, timeout)
+ self._serverish.invoke_unary_unary(
+ method_descriptor, handler, invocation_metadata, request, deadline)
+ return _server_rpc.UnaryUnaryServerRpc(handler)
+
+ def invoke_unary_stream(
+ self, method_descriptor, invocation_metadata, request, timeout):
+ deadline, handler = _deadline_and_handler(True, self._time, timeout)
+ self._serverish.invoke_unary_stream(
+ method_descriptor, handler, invocation_metadata, request, deadline)
+ return _server_rpc.UnaryStreamServerRpc(handler)
+
+ def invoke_stream_unary(
+ self, method_descriptor, invocation_metadata, timeout):
+ deadline, handler = _deadline_and_handler(False, self._time, timeout)
+ self._serverish.invoke_stream_unary(
+ method_descriptor, handler, invocation_metadata, deadline)
+ return _server_rpc.StreamUnaryServerRpc(handler)
+
+ def invoke_stream_stream(
+ self, method_descriptor, invocation_metadata, timeout):
+ deadline, handler = _deadline_and_handler(False, self._time, timeout)
+ self._serverish.invoke_stream_stream(
+ method_descriptor, handler, invocation_metadata, deadline)
+ return _server_rpc.StreamStreamServerRpc(handler)
+
+
+def server_from_descriptor_to_servicers(descriptors_to_servicers, time):
+ return _Server(_Serverish(descriptors_to_servicers, time), time)
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py b/src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py
new file mode 100644
index 0000000000..30de8ff0e2
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_server_rpc.py
@@ -0,0 +1,93 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc_testing
+
+
+class UnaryUnaryServerRpc(grpc_testing.UnaryUnaryServerRpc):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def initial_metadata(self):
+ return self._handler.initial_metadata()
+
+ def cancel(self):
+ self._handler.cancel()
+
+ def termination(self):
+ return self._handler.unary_response_termination()
+
+
+class UnaryStreamServerRpc(grpc_testing.UnaryStreamServerRpc):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def initial_metadata(self):
+ return self._handler.initial_metadata()
+
+ def take_response(self):
+ return self._handler.take_response()
+
+ def cancel(self):
+ self._handler.cancel()
+
+ def termination(self):
+ return self._handler.stream_response_termination()
+
+
+class StreamUnaryServerRpc(grpc_testing.StreamUnaryServerRpc):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def initial_metadata(self):
+ return self._handler.initial_metadata()
+
+ def send_request(self, request):
+ self._handler.add_request(request)
+
+ def requests_closed(self):
+ self._handler.requests_closed()
+
+ def cancel(self):
+ self._handler.cancel()
+
+ def termination(self):
+ return self._handler.unary_response_termination()
+
+
+class StreamStreamServerRpc(grpc_testing.StreamStreamServerRpc):
+
+ def __init__(self, handler):
+ self._handler = handler
+
+ def initial_metadata(self):
+ return self._handler.initial_metadata()
+
+ def send_request(self, request):
+ self._handler.add_request(request)
+
+ def requests_closed(self):
+ self._handler.requests_closed()
+
+ def take_response(self):
+ return self._handler.take_response()
+
+ def cancel(self):
+ self._handler.cancel()
+
+ def termination(self):
+ return self._handler.stream_response_termination()
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_service.py b/src/python/grpcio_testing/grpc_testing/_server/_service.py
new file mode 100644
index 0000000000..36b0a2f7ff
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_service.py
@@ -0,0 +1,88 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc
+
+
+class _RequestIterator(object):
+
+ def __init__(self, rpc, handler):
+ self._rpc = rpc
+ self._handler = handler
+
+ def _next(self):
+ read = self._handler.take_request()
+ if read.requests_closed:
+ raise StopIteration()
+ elif read.terminated:
+ rpc_error = grpc.RpcError()
+ self._rpc.add_rpc_error(rpc_error)
+ raise rpc_error
+ else:
+ return read.request
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ return self._next()
+
+ def next(self):
+ return self._next()
+
+
+def _unary_response(argument, implementation, rpc, servicer_context):
+ try:
+ response = implementation(argument, servicer_context)
+ except Exception as exception: # pylint: disable=broad-except
+ rpc.application_exception_abort(exception)
+ else:
+ rpc.unary_response_complete(response)
+
+
+def _stream_response(argument, implementation, rpc, servicer_context):
+ try:
+ response_iterator = implementation(argument, servicer_context)
+ except Exception as exception: # pylint: disable=broad-except
+ rpc.application_exception_abort(exception)
+ else:
+ while True:
+ try:
+ response = next(response_iterator)
+ except StopIteration:
+ rpc.stream_response_complete()
+ break
+ except Exception as exception: # pylint: disable=broad-except
+ rpc.application_exception_abort(exception)
+ break
+ else:
+ rpc.stream_response(response)
+
+
+def unary_unary(implementation, rpc, request, servicer_context):
+ _unary_response(request, implementation, rpc, servicer_context)
+
+
+def unary_stream(implementation, rpc, request, servicer_context):
+ _stream_response(request, implementation, rpc, servicer_context)
+
+
+def stream_unary(implementation, rpc, handler, servicer_context):
+ _unary_response(
+ _RequestIterator(rpc, handler), implementation, rpc, servicer_context)
+
+
+def stream_stream(implementation, rpc, handler, servicer_context):
+ _stream_response(
+ _RequestIterator(rpc, handler), implementation, rpc, servicer_context)
diff --git a/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py
new file mode 100644
index 0000000000..496689ded0
--- /dev/null
+++ b/src/python/grpcio_testing/grpc_testing/_server/_servicer_context.py
@@ -0,0 +1,74 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import grpc
+from grpc_testing import _common
+
+
+class ServicerContext(grpc.ServicerContext):
+
+ def __init__(self, rpc, time, deadline):
+ self._rpc = rpc
+ self._time = time
+ self._deadline = deadline
+
+ def is_active(self):
+ return self._rpc.is_active()
+
+ def time_remaining(self):
+ if self._rpc.is_active():
+ if self._deadline is None:
+ return None
+ else:
+ return max(0.0, self._deadline - self._time.time())
+ else:
+ return 0.0
+
+ def cancel(self):
+ self._rpc.application_cancel()
+
+ def add_callback(self, callback):
+ return self._rpc.add_callback(callback)
+
+ def invocation_metadata(self):
+ return self._rpc.invocation_metadata()
+
+ def peer(self):
+ raise NotImplementedError()
+
+ def peer_identities(self):
+ raise NotImplementedError()
+
+ def peer_identity_key(self):
+ raise NotImplementedError()
+
+ def auth_context(self):
+ raise NotImplementedError()
+
+ def send_initial_metadata(self, initial_metadata):
+ initial_metadata_sent = self._rpc.send_initial_metadata(
+ _common.fuss_with_metadata(initial_metadata))
+ if not initial_metadata_sent:
+ raise ValueError(
+ 'ServicerContext.send_initial_metadata called too late!')
+
+ def set_trailing_metadata(self, trailing_metadata):
+ self._rpc.set_trailing_metadata(
+ _common.fuss_with_metadata(trailing_metadata))
+
+ def set_code(self, code):
+ self._rpc.set_code(code)
+
+ def set_details(self, details):
+ self._rpc.set_details(details)
diff --git a/src/python/grpcio_tests/tests/testing/_server_application.py b/src/python/grpcio_tests/tests/testing/_server_application.py
new file mode 100644
index 0000000000..06f09c8cb4
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/_server_application.py
@@ -0,0 +1,66 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""An example gRPC Python-using server-side application."""
+
+import grpc
+
+# requests_pb2 is a semantic dependency of this module.
+from tests.testing import _application_common
+from tests.testing.proto import requests_pb2 # pylint: disable=unused-import
+from tests.testing.proto import services_pb2
+from tests.testing.proto import services_pb2_grpc
+
+
+class FirstServiceServicer(services_pb2_grpc.FirstServiceServicer):
+ """Services RPCs."""
+
+ def UnUn(self, request, context):
+ if _application_common.UNARY_UNARY_REQUEST == request:
+ return _application_common.UNARY_UNARY_RESPONSE
+ else:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ context.set_details('Something is wrong with your request!')
+ return services_pb2.Down()
+
+ def UnStre(self, request, context):
+ if _application_common.UNARY_STREAM_REQUEST != request:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ context.set_details('Something is wrong with your request!')
+ return
+ yield services_pb2.Strange()
+
+ def StreUn(self, request_iterator, context):
+ context.send_initial_metadata((
+ ('server_application_metadata_key', 'Hi there!',),))
+ for request in request_iterator:
+ if request != _application_common.STREAM_UNARY_REQUEST:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ context.set_details('Something is wrong with your request!')
+ return services_pb2.Strange()
+ elif not context.is_active():
+ return services_pb2.Strange()
+ else:
+ return _application_common.STREAM_UNARY_RESPONSE
+
+ def StreStre(self, request_iterator, context):
+ for request in request_iterator:
+ if request != _application_common.STREAM_STREAM_REQUEST:
+ context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
+ context.set_details('Something is wrong with your request!')
+ return
+ elif not context.is_active():
+ return
+ else:
+ yield _application_common.STREAM_STREAM_RESPONSE
+ yield _application_common.STREAM_STREAM_RESPONSE
diff --git a/src/python/grpcio_tests/tests/testing/_server_test.py b/src/python/grpcio_tests/tests/testing/_server_test.py
new file mode 100644
index 0000000000..7897bcce01
--- /dev/null
+++ b/src/python/grpcio_tests/tests/testing/_server_test.py
@@ -0,0 +1,169 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import unittest
+
+import grpc
+import grpc_testing
+
+from tests.testing import _application_common
+from tests.testing import _application_testing_common
+from tests.testing import _server_application
+from tests.testing.proto import services_pb2
+
+
+# TODO(https://github.com/google/protobuf/issues/3452): Drop this skip.
+@unittest.skipIf(
+ services_pb2.DESCRIPTOR.services_by_name.get('FirstService') is None,
+ 'Fix protobuf issue 3452!')
+class FirstServiceServicerTest(unittest.TestCase):
+
+ def setUp(self):
+ self._real_time = grpc_testing.strict_real_time()
+ self._fake_time = grpc_testing.strict_fake_time(time.time())
+ servicer = _server_application.FirstServiceServicer()
+ descriptors_to_servicers = {
+ _application_testing_common.FIRST_SERVICE: servicer
+ }
+ self._real_time_server = grpc_testing.server_from_dictionary(
+ descriptors_to_servicers, self._real_time)
+ self._fake_time_server = grpc_testing.server_from_dictionary(
+ descriptors_to_servicers, self._fake_time)
+
+ def test_successful_unary_unary(self):
+ rpc = self._real_time_server.invoke_unary_unary(
+ _application_testing_common.FIRST_SERVICE_UNUN, (),
+ _application_common.UNARY_UNARY_REQUEST, None)
+ initial_metadata = rpc.initial_metadata()
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_successful_unary_stream(self):
+ rpc = self._real_time_server.invoke_unary_stream(
+ _application_testing_common.FIRST_SERVICE_UNSTRE, (),
+ _application_common.UNARY_STREAM_REQUEST, None)
+ initial_metadata = rpc.initial_metadata()
+ trailing_metadata, code, details = rpc.termination()
+
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_successful_stream_unary(self):
+ rpc = self._real_time_server.invoke_stream_unary(
+ _application_testing_common.FIRST_SERVICE_STREUN, (), None)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.requests_closed()
+ initial_metadata = rpc.initial_metadata()
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertEqual(_application_common.STREAM_UNARY_RESPONSE, response)
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_successful_stream_stream(self):
+ rpc = self._real_time_server.invoke_stream_stream(
+ _application_testing_common.FIRST_SERVICE_STRESTRE, (), None)
+ rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
+ initial_metadata = rpc.initial_metadata()
+ responses = [
+ rpc.take_response(),
+ rpc.take_response(),
+ ]
+ rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
+ rpc.send_request(_application_common.STREAM_STREAM_REQUEST)
+ responses.extend([
+ rpc.take_response(),
+ rpc.take_response(),
+ rpc.take_response(),
+ rpc.take_response(),
+ ])
+ rpc.requests_closed()
+ trailing_metadata, code, details = rpc.termination()
+
+ for response in responses:
+ self.assertEqual(_application_common.STREAM_STREAM_RESPONSE,
+ response)
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_server_rpc_idempotence(self):
+ rpc = self._real_time_server.invoke_unary_unary(
+ _application_testing_common.FIRST_SERVICE_UNUN, (),
+ _application_common.UNARY_UNARY_REQUEST, None)
+ first_initial_metadata = rpc.initial_metadata()
+ second_initial_metadata = rpc.initial_metadata()
+ third_initial_metadata = rpc.initial_metadata()
+ first_termination = rpc.termination()
+ second_termination = rpc.termination()
+ third_termination = rpc.termination()
+
+ for later_initial_metadata in (second_initial_metadata,
+ third_initial_metadata,):
+ self.assertEqual(first_initial_metadata, later_initial_metadata)
+ response = first_termination[0]
+ terminal_metadata = first_termination[1]
+ code = first_termination[2]
+ details = first_termination[3]
+ for later_termination in (second_termination, third_termination,):
+ self.assertEqual(response, later_termination[0])
+ self.assertEqual(terminal_metadata, later_termination[1])
+ self.assertIs(code, later_termination[2])
+ self.assertEqual(details, later_termination[3])
+ self.assertEqual(_application_common.UNARY_UNARY_RESPONSE, response)
+ self.assertIs(code, grpc.StatusCode.OK)
+
+ def test_misbehaving_client_unary_unary(self):
+ rpc = self._real_time_server.invoke_unary_unary(
+ _application_testing_common.FIRST_SERVICE_UNUN, (),
+ _application_common.ERRONEOUS_UNARY_UNARY_REQUEST, None)
+ initial_metadata = rpc.initial_metadata()
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertIsNot(code, grpc.StatusCode.OK)
+
+ def test_infinite_request_stream_real_time(self):
+ rpc = self._real_time_server.invoke_stream_unary(
+ _application_testing_common.FIRST_SERVICE_STREUN, (),
+ _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ initial_metadata = rpc.initial_metadata()
+ self._real_time.sleep_for(
+ _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
+
+ def test_infinite_request_stream_fake_time(self):
+ rpc = self._fake_time_server.invoke_stream_unary(
+ _application_testing_common.FIRST_SERVICE_STREUN, (),
+ _application_common.INFINITE_REQUEST_STREAM_TIMEOUT)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ initial_metadata = rpc.initial_metadata()
+ self._fake_time.sleep_for(
+ _application_common.INFINITE_REQUEST_STREAM_TIMEOUT * 2)
+ rpc.send_request(_application_common.STREAM_UNARY_REQUEST)
+ response, trailing_metadata, code, details = rpc.termination()
+
+ self.assertIs(code, grpc.StatusCode.DEADLINE_EXCEEDED)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index c10719b86f..d61297b918 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -10,6 +10,7 @@
"protoc_plugin.beta_python_plugin_test.PythonPluginTest",
"reflection._reflection_servicer_test.ReflectionServicerTest",
"testing._client_test.ClientTest",
+ "testing._server_test.FirstServiceServicerTest",
"testing._time_test.StrictFakeTimeTest",
"testing._time_test.StrictRealTimeTest",
"unit._api_test.AllTest",
diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml
index 05db404582..32b84b8de0 100644
--- a/src/ruby/.rubocop_todo.yml
+++ b/src/ruby/.rubocop_todo.yml
@@ -1,44 +1,569 @@
-# This configuration was generated by `rubocop --auto-gen-config`
-# on 2015-05-22 13:23:34 -0700 using RuboCop version 0.30.1.
+# This configuration was generated by
+# `rubocop --auto-gen-config`
+# on 2017-09-04 17:00:36 +0200 using RuboCop version 0.49.1.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
# versions of RuboCop, may require this file to be generated again.
-# Offense count: 30
-Metrics/AbcSize:
- Max: 38
+# Offense count: 3
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles, IndentOneStep, IndentationWidth.
+# SupportedStyles: case, end
+Layout/CaseIndentation:
+ Exclude:
+ - 'tools/platform_check.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+Layout/CommentIndentation:
+ Exclude:
+ - 'qps/client.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+Layout/EmptyLineAfterMagicComment:
+ Exclude:
+ - 'tools/grpc-tools.gemspec'
+
+# Offense count: 33
+# Cop supports --auto-correct.
+# Configuration parameters: AllowAdjacentOneLineDefs, NumberOfEmptyLines.
+Layout/EmptyLineBetweenDefs:
+ Exclude:
+ - 'qps/client.rb'
+ - 'qps/histogram.rb'
+ - 'qps/proxy-worker.rb'
+ - 'qps/server.rb'
+ - 'qps/worker.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+Layout/EmptyLines:
+ Exclude:
+ - 'qps/qps-common.rb'
+
+# Offense count: 8
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: empty_lines, empty_lines_except_namespace, empty_lines_special, no_empty_lines
+Layout/EmptyLinesAroundClassBody:
+ Exclude:
+ - 'pb/grpc/testing/duplicate/echo_duplicate_services_pb.rb'
+ - 'pb/grpc/testing/metrics_services_pb.rb'
+ - 'pb/src/proto/grpc/testing/test_services_pb.rb'
+ - 'qps/src/proto/grpc/testing/proxy-service_services_pb.rb'
+ - 'qps/src/proto/grpc/testing/services_services_pb.rb'
+
+# Offense count: 28
+# Cop supports --auto-correct.
+# Configuration parameters: AllowForAlignment, ForceEqualSignAlignment.
+Layout/ExtraSpacing:
+ Enabled: false
+
+# Offense count: 1
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: normal, rails
+Layout/IndentationConsistency:
+ Exclude:
+ - 'pb/grpc/health/checker.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+# Configuration parameters: Width, IgnoredPatterns.
+Layout/IndentationWidth:
+ Exclude:
+ - 'pb/grpc/health/checker.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: symmetrical, new_line, same_line
+Layout/MultilineHashBraceLayout:
+ Exclude:
+ - 'spec/generic/active_call_spec.rb'
+
+# Offense count: 70
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: symmetrical, new_line, same_line
+Layout/MultilineMethodCallBraceLayout:
+ Enabled: false
+
+# Offense count: 2
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles, IndentationWidth.
+# SupportedStyles: aligned, indented, indented_relative_to_receiver
+Layout/MultilineMethodCallIndentation:
+ Exclude:
+ - 'spec/generic/rpc_desc_spec.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: symmetrical, new_line, same_line
+Layout/MultilineMethodDefinitionBraceLayout:
+ Exclude:
+ - 'spec/generic/client_stub_spec.rb'
+
+# Offense count: 5
+# Cop supports --auto-correct.
+Layout/SpaceAfterColon:
+ Exclude:
+ - 'lib/grpc/generic/rpc_server.rb'
+
+# Offense count: 7
+# Cop supports --auto-correct.
+Layout/SpaceAfterComma:
+ Exclude:
+ - 'qps/client.rb'
+
+# Offense count: 27
+# Cop supports --auto-correct.
+# Configuration parameters: AllowForAlignment.
+Layout/SpaceAroundOperators:
+ Exclude:
+ - 'qps/client.rb'
+ - 'qps/histogram.rb'
+ - 'qps/proxy-worker.rb'
+ - 'qps/server.rb'
+ - 'spec/generic/active_call_spec.rb'
+ - 'spec/generic/rpc_server_spec.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles, EnforcedStyleForEmptyBraces, SupportedStylesForEmptyBraces, SpaceBeforeBlockParameters.
+# SupportedStyles: space, no_space
+# SupportedStylesForEmptyBraces: space, no_space
+Layout/SpaceInsideBlockBraces:
+ Exclude:
+ - 'stress/stress_client.rb'
+
+# Offense count: 4
+# Cop supports --auto-correct.
+Layout/SpaceInsideBrackets:
+ Exclude:
+ - 'tools/bin/grpc_tools_ruby_protoc'
+ - 'tools/bin/grpc_tools_ruby_protoc_plugin'
+
+# Offense count: 2
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles, EnforcedStyleForEmptyBraces, SupportedStylesForEmptyBraces.
+# SupportedStyles: space, no_space, compact
+# SupportedStylesForEmptyBraces: space, no_space
+Layout/SpaceInsideHashLiteralBraces:
+ Exclude:
+ - 'qps/server.rb'
+
+# Offense count: 6
+# Cop supports --auto-correct.
+Layout/SpaceInsidePercentLiteralDelimiters:
+ Exclude:
+ - 'spec/generic/client_stub_spec.rb'
+ - 'tools/grpc-tools.gemspec'
# Offense count: 3
-# Configuration parameters: CountComments.
-Metrics/ClassLength:
- Max: 200
+# Cop supports --auto-correct.
+Layout/Tab:
+ Exclude:
+ - 'pb/grpc/health/checker.rb'
+ - 'qps/client.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+Layout/TrailingWhitespace:
+ Exclude:
+ - 'qps/worker.rb'
+
+# Offense count: 1
+Lint/IneffectiveAccessModifier:
+ Exclude:
+ - 'lib/grpc/generic/active_call.rb'
+
+# Offense count: 4
+# Cop supports --auto-correct.
+Lint/PercentStringArray:
+ Exclude:
+ - 'spec/client_server_spec.rb'
+ - 'spec/generic/active_call_spec.rb'
+ - 'spec/generic/client_stub_spec.rb'
+
+# Offense count: 4
+Lint/ScriptPermission:
+ Exclude:
+ - 'qps/client.rb'
+ - 'qps/histogram.rb'
+ - 'qps/qps-common.rb'
+ - 'qps/server.rb'
+
+# Offense count: 2
+# Cop supports --auto-correct.
+# Configuration parameters: IgnoreEmptyBlocks, AllowUnusedKeywordArguments.
+Lint/UnusedBlockArgument:
+ Exclude:
+ - 'qps/client.rb'
+
+# Offense count: 2
+# Cop supports --auto-correct.
+# Configuration parameters: AllowUnusedKeywordArguments, IgnoreEmptyMethods.
+Lint/UnusedMethodArgument:
+ Exclude:
+ - 'qps/client.rb'
+
+# Offense count: 1
+# Configuration parameters: ContextCreatingMethods, MethodCreatingMethods.
+Lint/UselessAccessModifier:
+ Exclude:
+ - 'lib/grpc/logconfig.rb'
+
+# Offense count: 1
+Lint/UselessAssignment:
+ Exclude:
+ - 'qps/client.rb'
-# Offense count: 35
+# Offense count: 4
+Lint/Void:
+ Exclude:
+ - 'stress/metrics_server.rb'
+ - 'stress/stress_client.rb'
+
+# Offense count: 53
+Metrics/AbcSize:
+ Max: 57
+
+# Offense count: 81
+# Configuration parameters: CountComments, ExcludedMethods.
+Metrics/BlockLength:
+ Max: 715
+
+# Offense count: 82
+# Configuration parameters: AllowHeredoc, AllowURI, URISchemes, IgnoreCopDirectives, IgnoredPatterns.
+# URISchemes: http, https
+Metrics/LineLength:
+ Max: 141
+
+# Offense count: 82
# Configuration parameters: CountComments.
Metrics/MethodLength:
- Max: 36
+ Max: 54
-# Offense count: 7
+# Offense count: 5
# Configuration parameters: CountKeywordArgs.
Metrics/ParameterLists:
- Max: 8
+ Max: 7
+
+# Offense count: 1
+# Cop supports --auto-correct.
+Performance/RedundantBlockCall:
+ Exclude:
+ - 'spec/generic/client_stub_spec.rb'
+
+# Offense count: 5
+# Cop supports --auto-correct.
+# Configuration parameters: MaxKeyValuePairs.
+Performance/RedundantMerge:
+ Exclude:
+ - 'spec/generic/active_call_spec.rb'
+ - 'spec/generic/client_stub_spec.rb'
+
+# Offense count: 8
+# Cop supports --auto-correct.
+Performance/TimesMap:
+ Exclude:
+ - 'spec/channel_spec.rb'
+ - 'spec/client_server_spec.rb'
+ - 'spec/server_spec.rb'
+
+# Offense count: 7
+Style/AccessorMethodName:
+ Exclude:
+ - 'qps/server.rb'
+ - 'stress/metrics_server.rb'
+ - 'stress/stress_client.rb'
+
+# Offense count: 2
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: prefer_alias, prefer_alias_method
+Style/Alias:
+ Exclude:
+ - 'lib/grpc/generic/rpc_server.rb'
+ - 'lib/grpc/notifier.rb'
+
+# Offense count: 7
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles, ProceduralMethods, FunctionalMethods, IgnoredMethods.
+# SupportedStyles: line_count_based, semantic, braces_for_chaining
+# ProceduralMethods: benchmark, bm, bmbm, create, each_with_object, measure, new, realtime, tap, with_object
+# FunctionalMethods: let, let!, subject, watch
+# IgnoredMethods: lambda, proc, it
+Style/BlockDelimiters:
+ Exclude:
+ - 'qps/client.rb'
+ - 'qps/proxy-worker.rb'
+ - 'qps/server.rb'
+ - 'qps/worker.rb'
+
+# Offense count: 2
+# Cop supports --auto-correct.
+Style/ClassMethods:
+ Exclude:
+ - 'tools/platform_check.rb'
+
+# Offense count: 2
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles, SingleLineConditionsOnly, IncludeTernaryExpressions.
+# SupportedStyles: assign_to_condition, assign_inside_condition
+Style/ConditionalAssignment:
+ Exclude:
+ - 'lib/grpc/generic/rpc_server.rb'
+ - 'lib/grpc/generic/service.rb'
+
+# Offense count: 19
+Style/Documentation:
+ Exclude:
+ - 'spec/**/*'
+ - 'test/**/*'
+ - 'pb/grpc/testing/duplicate/echo_duplicate_services_pb.rb'
+ - 'pb/grpc/testing/metrics_services_pb.rb'
+ - 'pb/src/proto/grpc/testing/test_pb.rb'
+ - 'qps/client.rb'
+ - 'qps/histogram.rb'
+ - 'qps/proxy-worker.rb'
+ - 'qps/server.rb'
+ - 'qps/src/proto/grpc/testing/proxy-service_services_pb.rb'
+ - 'qps/src/proto/grpc/testing/services_pb.rb'
+ - 'qps/src/proto/grpc/testing/services_services_pb.rb'
+ - 'qps/worker.rb'
+ - 'stress/metrics_server.rb'
+ - 'stress/stress_client.rb'
+ - 'tools/platform_check.rb'
-# Offense count: 9
+# Offense count: 8
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: compact, expanded
+Style/EmptyMethod:
+ Exclude:
+ - 'bin/noproto_server.rb'
+ - 'lib/grpc/logconfig.rb'
+ - 'spec/generic/rpc_desc_spec.rb'
+
+# Offense count: 2
+# Configuration parameters: ExpectMatchingDefinition, Regex, IgnoreExecutableScripts, AllowedAcronyms.
+# AllowedAcronyms: CLI, DSL, ACL, API, ASCII, CPU, CSS, DNS, EOF, GUID, HTML, HTTP, HTTPS, ID, IP, JSON, LHS, QPS, RAM, RHS, RPC, SLA, SMTP, SQL, SSH, TCP, TLS, TTL, UDP, UI, UID, UUID, URI, URL, UTF8, VM, XML, XMPP, XSRF, XSS
+Style/FileName:
+ Exclude:
+ - 'qps/src/proto/grpc/testing/proxy-service_pb.rb'
+ - 'qps/src/proto/grpc/testing/proxy-service_services_pb.rb'
+
+# Offense count: 12
# Configuration parameters: AllowedVariables.
Style/GlobalVars:
- Enabled: false
+ Exclude:
+ - 'ext/grpc/extconf.rb'
+
+# Offense count: 3
+# Configuration parameters: MinBodyLength.
+Style/GuardClause:
+ Exclude:
+ - 'lib/grpc/generic/bidi_call.rb'
+ - 'lib/grpc/generic/rpc_server.rb'
+ - 'qps/client.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles, UseHashRocketsWithSymbolValues, PreferHashRocketsForNonAlnumEndingSymbols.
+# SupportedStyles: ruby19, hash_rockets, no_mixed_keys, ruby19_no_mixed_keys
+Style/HashSyntax:
+ Exclude:
+ - 'stress/metrics_server.rb'
+
+# Offense count: 1
+Style/IfInsideElse:
+ Exclude:
+ - 'lib/grpc/generic/rpc_desc.rb'
+
+# Offense count: 4
+# Cop supports --auto-correct.
+# Configuration parameters: MaxLineLength.
+Style/IfUnlessModifier:
+ Exclude:
+ - 'ext/grpc/extconf.rb'
+ - 'qps/histogram.rb'
+ - 'stress/stress_client.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+Style/MethodCallWithoutArgsParentheses:
+ Exclude:
+ - 'qps/client.rb'
+
+# Offense count: 3
+# Cop supports --auto-correct.
+Style/MultilineIfModifier:
+ Exclude:
+ - 'lib/grpc/generic/bidi_call.rb'
+ - 'lib/grpc/generic/client_stub.rb'
+ - 'spec/spec_helper.rb'
+
+# Offense count: 7
+# Cop supports --auto-correct.
+Style/MutableConstant:
+ Exclude:
+ - 'ext/grpc/extconf.rb'
+ - 'lib/grpc/version.rb'
+ - 'spec/compression_options_spec.rb'
+ - 'spec/generic/active_call_spec.rb'
+ - 'tools/version.rb'
# Offense count: 1
-# Configuration parameters: EnforcedStyle, MinBodyLength, SupportedStyles.
-Style/Next:
+# Cop supports --auto-correct.
+Style/NegatedWhile:
+ Exclude:
+ - 'qps/client.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+# Configuration parameters: AutoCorrect, EnforcedStyle, SupportedStyles.
+# SupportedStyles: predicate, comparison
+Style/NumericPredicate:
+ Exclude:
+ - 'spec/**/*'
+ - 'ext/grpc/extconf.rb'
+
+# Offense count: 7
+# Cop supports --auto-correct.
+Style/ParallelAssignment:
+ Exclude:
+ - 'bin/math_server.rb'
+ - 'lib/grpc/generic/rpc_server.rb'
+ - 'spec/generic/client_stub_spec.rb'
+ - 'spec/generic/rpc_desc_spec.rb'
+ - 'spec/generic/rpc_server_pool_spec.rb'
+ - 'spec/generic/rpc_server_spec.rb'
+
+# Offense count: 8
+# Cop supports --auto-correct.
+# Configuration parameters: PreferredDelimiters.
+Style/PercentLiteralDelimiters:
+ Exclude:
+ - 'end2end/grpc_class_init_driver.rb'
+ - 'spec/client_server_spec.rb'
+ - 'spec/generic/active_call_spec.rb'
+ - 'spec/generic/client_stub_spec.rb'
+ - 'tools/grpc-tools.gemspec'
+
+# Offense count: 3
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: compact, exploded
+Style/RaiseArgs:
+ Exclude:
+ - 'stress/metrics_server.rb'
+
+# Offense count: 4
+# Cop supports --auto-correct.
+Style/RedundantParentheses:
+ Exclude:
+ - 'lib/grpc/generic/rpc_server.rb'
+ - 'qps/client.rb'
+ - 'qps/proxy-worker.rb'
+ - 'spec/generic/rpc_desc_spec.rb'
+
+# Offense count: 5
+# Cop supports --auto-correct.
+# Configuration parameters: AllowMultipleReturnValues.
+Style/RedundantReturn:
+ Exclude:
+ - 'end2end/grpc_class_init_client.rb'
+
+# Offense count: 77
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: only_raise, only_fail, semantic
+Style/SignalException:
Enabled: false
# Offense count: 2
-# Configuration parameters: Methods.
-Style/SingleLineBlockParams:
- Enabled: false
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles.
+# SupportedStyles: use_perl_names, use_english_names
+Style/SpecialGlobalVars:
+ Exclude:
+ - 'ext/grpc/extconf.rb'
+ - 'stress/stress_client.rb'
+
+# Offense count: 189
+# Cop supports --auto-correct.
+# Configuration parameters: EnforcedStyle, SupportedStyles, ConsistentQuotesInMultiline.
+# SupportedStyles: single_quotes, double_quotes
+Style/StringLiterals:
+ Exclude:
+ - 'pb/grpc/testing/metrics_pb.rb'
+ - 'pb/src/proto/grpc/testing/empty_pb.rb'
+ - 'pb/src/proto/grpc/testing/messages_pb.rb'
+ - 'qps/proxy-worker.rb'
+ - 'qps/server.rb'
+ - 'qps/src/proto/grpc/testing/control_pb.rb'
+ - 'qps/src/proto/grpc/testing/messages_pb.rb'
+ - 'qps/src/proto/grpc/testing/payloads_pb.rb'
+ - 'qps/src/proto/grpc/testing/proxy-service_pb.rb'
+ - 'qps/src/proto/grpc/testing/stats_pb.rb'
+ - 'qps/worker.rb'
# Offense count: 1
Style/StructInheritance:
- Enabled: false
+ Exclude:
+ - 'lib/grpc/generic/rpc_desc.rb'
+
+# Offense count: 10
+# Cop supports --auto-correct.
+# Configuration parameters: MinSize, SupportedStyles.
+# SupportedStyles: percent, brackets
+Style/SymbolArray:
+ EnforcedStyle: brackets
+
+# Offense count: 2
+# Cop supports --auto-correct.
+# Configuration parameters: IgnoredMethods.
+# IgnoredMethods: respond_to, define_method
+Style/SymbolProc:
+ Exclude:
+ - 'qps/client.rb'
+ - 'stress/stress_client.rb'
+
+# Offense count: 6
+# Cop supports --auto-correct.
+# Configuration parameters: AllowNamedUnderscoreVariables.
+Style/TrailingUnderscoreVariable:
+ Exclude:
+ - 'spec/channel_credentials_spec.rb'
+ - 'spec/server_credentials_spec.rb'
+
+# Offense count: 3
+# Cop supports --auto-correct.
+# Configuration parameters: ExactNameMatch, AllowPredicates, AllowDSLWriters, IgnoreClassMethods, Whitelist.
+# Whitelist: to_ary, to_a, to_c, to_enum, to_h, to_hash, to_i, to_int, to_io, to_open, to_path, to_proc, to_r, to_regexp, to_str, to_s, to_sym
+Style/TrivialAccessors:
+ Exclude:
+ - 'qps/histogram.rb'
+
+# Offense count: 3
+# Cop supports --auto-correct.
+Style/UnneededInterpolation:
+ Exclude:
+ - 'pb/grpc/health/checker.rb'
+
+# Offense count: 1
+# Cop supports --auto-correct.
+Style/YodaCondition:
+ Exclude:
+ - 'stress/stress_client.rb'
+
+# Offense count: 2
+# Cop supports --auto-correct.
+Style/ZeroLengthPredicate:
+ Exclude:
+ - 'lib/grpc/generic/rpc_server.rb'
diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb
index b9e3d6054f..c35719a71f 100755
--- a/src/ruby/end2end/grpc_class_init_client.rb
+++ b/src/ruby/end2end/grpc_class_init_client.rb
@@ -41,7 +41,7 @@ def run_gc_stress_test(test_proc)
GC.enable
construct_many(test_proc)
- GC.start(full_mark: true, immediate_sweep: true)
+ GC.start
construct_many(test_proc)
end
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 74f189e1e0..5550bb7d5f 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -905,6 +905,9 @@ static void Init_grpc_error_codes() {
rb_define_const(grpc_rb_mRpcErrors, "INVALID_FLAGS",
UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS));
+ /* Hint the GC that this is a global and shouldn't be sweeped. */
+ rb_global_variable(&rb_error_code_details);
+
/* Add the detail strings to a Hash */
rb_error_code_details = rb_hash_new();
rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_OK),