From 2f2175c207fab465db52184eb9a9c492026ac471 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 3 Oct 2017 14:25:08 -0700 Subject: Counter for number of failed trylocks in cq --- src/core/lib/debug/stats_data.c | 3 +++ src/core/lib/debug/stats_data.h | 4 ++++ src/core/lib/debug/stats_data.yaml | 4 ++++ src/core/lib/debug/stats_data_bq_schema.sql | 3 ++- src/core/lib/surface/completion_queue.c | 4 ++++ 5 files changed, 17 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index c0aec63c1d..41d9e95e29 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -111,6 +111,7 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_push_retries", "server_requested_calls", "server_slowpath_requests_queued", + "cq_failed_queue_trylocks", }; const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of client side calls created by this process", @@ -220,6 +221,8 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "How many calls were requested (not necessarily received) by the server", "How many times was the server slow path taken (indicates too few " "outstanding requests)", + "Number of lock (trylock) acquisition failures on completion queue event " + "queue. High value here indicates high contention on completion queues", }; const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { "call_initial_size", diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index 28dab00117..0a2b323bdf 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -113,6 +113,7 @@ typedef enum { GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES, GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS, GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED, + GRPC_STATS_COUNTER_CQ_FAILED_QUEUE_TRYLOCKS, GRPC_STATS_COUNTER_COUNT } grpc_stats_counters; extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; @@ -417,6 +418,9 @@ typedef enum { #define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED) +#define GRPC_STATS_INC_CQ_FAILED_QUEUE_TRYLOCKS(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CQ_FAILED_QUEUE_TRYLOCKS) #define GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, value) \ grpc_stats_inc_call_initial_size((exec_ctx), (int)(value)) void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int x); diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index b5c15ff55c..5efe353200 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -270,3 +270,7 @@ - counter: server_slowpath_requests_queued doc: How many times was the server slow path taken (indicates too few outstanding requests) +# cq +- counter: cq_failed_queue_trylocks + doc: Number of lock (trylock) acquisition failures on completion queue event + queue. High value here indicates high contention on completion queues diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index f96e40c00e..6ad58e2530 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -85,4 +85,5 @@ executor_wakeup_initiated_per_iteration:FLOAT, executor_queue_drained_per_iteration:FLOAT, executor_push_retries_per_iteration:FLOAT, server_requested_calls_per_iteration:FLOAT, -server_slowpath_requests_queued_per_iteration:FLOAT +server_slowpath_requests_queued_per_iteration:FLOAT, +cq_failed_queue_trylocks_per_iteration:FLOAT diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index fed66e3a20..037b10e848 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -378,6 +378,10 @@ static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { if (gpr_spinlock_trylock(&q->queue_lock)) { c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); gpr_spinlock_unlock(&q->queue_lock); + } else { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_STATS_INC_CQ_FAILED_QUEUE_TRYLOCKS(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); } if (c) { -- cgit v1.2.3 From 0d0fa06488e09bb404511af15523f2e75a2e86c8 Mon Sep 17 00:00:00 2001 From: Sree Kuchibhotla Date: Tue, 3 Oct 2017 16:04:42 -0700 Subject: Add more counters in cq --- src/core/lib/debug/stats_data.c | 8 +++++++- src/core/lib/debug/stats_data.h | 16 +++++++++++---- src/core/lib/debug/stats_data.yaml | 8 +++++++- src/core/lib/debug/stats_data_bq_schema.sql | 4 +++- src/core/lib/surface/completion_queue.c | 17 +++++++++++---- tools/run_tests/performance/massage_qps_stats.py | 4 +++- .../performance/scenario_result_schema.json | 24 ++++++++++++++++++++-- 7 files changed, 67 insertions(+), 14 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c index 41d9e95e29..a9ffaecfb9 100644 --- a/src/core/lib/debug/stats_data.c +++ b/src/core/lib/debug/stats_data.c @@ -111,7 +111,9 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_push_retries", "server_requested_calls", "server_slowpath_requests_queued", - "cq_failed_queue_trylocks", + "cq_ev_queue_trylock_failures", + "cq_ev_queue_trylock_successes", + "cq_ev_queue_transient_pop_failures", }; const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of client side calls created by this process", @@ -223,6 +225,10 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "outstanding requests)", "Number of lock (trylock) acquisition failures on completion queue event " "queue. High value here indicates high contention on completion queues", + "Number of lock (trylock) acquisition successes on completion queue event " + "queue.", + "Number of times NULL was popped out of completion queue's event queue " + "even though the event queue was not empty", }; const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { "call_initial_size", diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index 0a2b323bdf..86696cc438 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -113,7 +113,9 @@ typedef enum { GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES, GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS, GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED, - GRPC_STATS_COUNTER_CQ_FAILED_QUEUE_TRYLOCKS, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES, GRPC_STATS_COUNTER_COUNT } grpc_stats_counters; extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; @@ -418,9 +420,15 @@ typedef enum { #define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED) -#define GRPC_STATS_INC_CQ_FAILED_QUEUE_TRYLOCKS(exec_ctx) \ - GRPC_STATS_INC_COUNTER((exec_ctx), \ - GRPC_STATS_COUNTER_CQ_FAILED_QUEUE_TRYLOCKS) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES) #define GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, value) \ grpc_stats_inc_call_initial_size((exec_ctx), (int)(value)) void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int x); diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 5efe353200..e6602e1396 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -271,6 +271,12 @@ doc: How many times was the server slow path taken (indicates too few outstanding requests) # cq -- counter: cq_failed_queue_trylocks +- counter: cq_ev_queue_trylock_failures doc: Number of lock (trylock) acquisition failures on completion queue event queue. High value here indicates high contention on completion queues +- counter: cq_ev_queue_trylock_successes + doc: Number of lock (trylock) acquisition successes on completion queue event + queue. +- counter: cq_ev_queue_transient_pop_failures + doc: Number of times NULL was popped out of completion queue's event queue + even though the event queue was not empty diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 6ad58e2530..0f70a52a6c 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -86,4 +86,6 @@ executor_queue_drained_per_iteration:FLOAT, executor_push_retries_per_iteration:FLOAT, server_requested_calls_per_iteration:FLOAT, server_slowpath_requests_queued_per_iteration:FLOAT, -cq_failed_queue_trylocks_per_iteration:FLOAT +cq_ev_queue_trylock_failures_per_iteration:FLOAT, +cq_ev_queue_trylock_successes_per_iteration:FLOAT, +cq_ev_queue_transient_pop_failures_per_iteration:FLOAT diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 037b10e848..689b51cfbe 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -375,15 +375,24 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { grpc_cq_completion *c = NULL; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (gpr_spinlock_trylock(&q->queue_lock)) { - c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx); + + bool is_empty = false; + c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty); gpr_spinlock_unlock(&q->queue_lock); + + if (c == NULL && !is_empty) { + GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx); + } } else { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GRPC_STATS_INC_CQ_FAILED_QUEUE_TRYLOCKS(&exec_ctx); - grpc_exec_ctx_finish(&exec_ctx); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx); } + grpc_exec_ctx_finish(&exec_ctx); + if (c) { gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); } diff --git a/tools/run_tests/performance/massage_qps_stats.py b/tools/run_tests/performance/massage_qps_stats.py index 49d165d94f..ad8585c27e 100644 --- a/tools/run_tests/performance/massage_qps_stats.py +++ b/tools/run_tests/performance/massage_qps_stats.py @@ -108,7 +108,9 @@ def massage_qps_stats(scenario_result): stats["core_executor_push_retries"] = massage_qps_stats_helpers.counter(core_stats, "executor_push_retries") stats["core_server_requested_calls"] = massage_qps_stats_helpers.counter(core_stats, "server_requested_calls") stats["core_server_slowpath_requests_queued"] = massage_qps_stats_helpers.counter(core_stats, "server_slowpath_requests_queued") - stats["core_cq_failed_queue_trylocks"] = massage_qps_stats_helpers.counter(core_stats, "cq_failed_queue_trylocks") + stats["core_cq_ev_queue_trylock_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_failures") + stats["core_cq_ev_queue_trylock_successes"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_successes") + stats["core_cq_ev_queue_transient_pop_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_transient_pop_failures") h = massage_qps_stats_helpers.histogram(core_stats, "call_initial_size") stats["core_call_initial_size"] = ",".join("%f" % x for x in h.buckets) stats["core_call_initial_size_bkts"] = ",".join("%f" % x for x in h.boundaries) diff --git a/tools/run_tests/performance/scenario_result_schema.json b/tools/run_tests/performance/scenario_result_schema.json index 11d8dba5d3..099048c297 100644 --- a/tools/run_tests/performance/scenario_result_schema.json +++ b/tools/run_tests/performance/scenario_result_schema.json @@ -552,7 +552,17 @@ }, { "mode": "NULLABLE", - "name": "core_cq_failed_queue_trylocks", + "name": "core_cq_ev_queue_trylock_failures", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_successes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_transient_pop_failures", "type": "INTEGER" }, { @@ -1349,7 +1359,17 @@ }, { "mode": "NULLABLE", - "name": "core_cq_failed_queue_trylocks", + "name": "core_cq_ev_queue_trylock_failures", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_successes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_transient_pop_failures", "type": "INTEGER" }, { -- cgit v1.2.3 From 97b6e5db9f91f974a9655cbd591046d8c3b61a07 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 9 Oct 2017 08:31:41 -0700 Subject: Process updates immediately instead of waiting for a previous one to complete. --- .../client_channel/lb_policy/grpclb/grpclb.cc | 55 +++------------------- 1 file changed, 7 insertions(+), 48 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc index 53fa0fff04..ffd58129c6 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc @@ -345,9 +345,6 @@ typedef struct glb_lb_policy { /** are we currently updating lb_call? */ bool updating_lb_call; - /** are we currently updating lb_channel? */ - bool updating_lb_channel; - /** are we already watching the LB channel's connectivity? */ bool watching_lb_channel; @@ -360,9 +357,6 @@ typedef struct glb_lb_policy { /** called upon changes to the LB channel's connectivity. */ grpc_closure lb_channel_on_connectivity_changed; - /** args from the latest update received while already updating, or NULL */ - grpc_lb_policy_args *pending_update_args; - /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ @@ -982,10 +976,6 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } grpc_fake_resolver_response_generator_unref(glb_policy->response_generator); grpc_subchannel_index_unref(); - if (glb_policy->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, glb_policy->pending_update_args->args); - gpr_free(glb_policy->pending_update_args); - } gpr_free(glb_policy); } @@ -1752,45 +1742,22 @@ static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } const grpc_lb_addresses *addresses = (const grpc_lb_addresses *)arg->value.pointer.p; - + // If a non-empty serverlist hasn't been received from the balancer, + // propagate the update to fallback_backend_addresses. if (glb_policy->serverlist == NULL) { - // If a non-empty serverlist hasn't been received from the balancer, - // propagate the update to fallback_backend_addresses. fallback_update_locked(exec_ctx, glb_policy, addresses); - } else if (glb_policy->updating_lb_channel) { - // If we have recieved serverlist from the balancer, we need to defer update - // when there is an in-progress one. - if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, - "Update already in progress for grpclb %p. Deferring update.", - (void *)glb_policy); - } - if (glb_policy->pending_update_args != NULL) { - grpc_channel_args_destroy(exec_ctx, - glb_policy->pending_update_args->args); - gpr_free(glb_policy->pending_update_args); - } - glb_policy->pending_update_args = (grpc_lb_policy_args *)gpr_zalloc( - sizeof(*glb_policy->pending_update_args)); - glb_policy->pending_update_args->client_channel_factory = - args->client_channel_factory; - glb_policy->pending_update_args->args = grpc_channel_args_copy(args->args); - glb_policy->pending_update_args->combiner = args->combiner; - return; } - - glb_policy->updating_lb_channel = true; GPR_ASSERT(glb_policy->lb_channel != NULL); + // Propagate updates to the LB channel (pick_first) through the fake + // resolver. grpc_channel_args *lb_channel_args = build_lb_channel_args( exec_ctx, addresses, glb_policy->response_generator, args->args); - /* Propagate updates to the LB channel (pick first) through the fake resolver - */ grpc_fake_resolver_response_generator_set_response( exec_ctx, glb_policy->response_generator, lb_channel_args); grpc_channel_args_destroy(exec_ctx, lb_channel_args); - + // Start watching the LB channel connectivity for connection, if not + // already doing so. if (!glb_policy->watching_lb_channel) { - // Watch the LB channel connectivity for connection. glb_policy->lb_channel_connectivity = grpc_channel_check_connectivity_state( glb_policy->lb_channel, true /* try to connect */); grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element( @@ -1842,18 +1809,10 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, /* fallthrough */ case GRPC_CHANNEL_READY: if (glb_policy->lb_call != NULL) { - glb_policy->updating_lb_channel = false; glb_policy->updating_lb_call = true; grpc_call_cancel(glb_policy->lb_call, NULL); - // lb_on_server_status_received will pick up the cancel and reinit + // lb_on_server_status_received() will pick up the cancel and reinit // lb_call. - if (glb_policy->pending_update_args != NULL) { - grpc_lb_policy_args *args = glb_policy->pending_update_args; - glb_policy->pending_update_args = NULL; - glb_update_locked(exec_ctx, &glb_policy->base, args); - grpc_channel_args_destroy(exec_ctx, args->args); - gpr_free(args); - } } else if (glb_policy->started_picking && !glb_policy->shutting_down) { if (glb_policy->retry_timer_active) { grpc_timer_cancel(exec_ctx, &glb_policy->lb_call_retry_timer); -- cgit v1.2.3 From 1ba537069ca8ac3b97502f66e815f9e84a73acbc Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 11 Oct 2017 10:10:23 -0700 Subject: Sanity check that tests declared non-polling actually don't poll --- src/core/lib/iomgr/ev_posix.cc | 18 ++++++++++++++++++ tools/run_tests/generated/tests.json | 3 ++- tools/run_tests/run_tests.py | 2 +- 3 files changed, 21 insertions(+), 2 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index e4033fab1d..369baa621f 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -61,12 +61,30 @@ typedef struct { event_engine_factory_fn factory; } event_engine_factory; +namespace { +extern "C" { +int dummypoll(struct pollfd fds[], nfds_t nfds, int timeout) { + gpr_log(GPR_ERROR, "Attempted to poll despite declaring non-polling."); + GPR_ASSERT(false); + return -1; +} +} // extern "C" + +const grpc_event_engine_vtable *init_non_polling(bool explicit_request) { + // return the simplest engine as a dummy but also override the poller + auto ret = grpc_init_poll_posix(explicit_request); + grpc_poll_function = dummypoll; + return ret; +} +} // namespace + static const event_engine_factory g_factories[] = { {"epoll1", grpc_init_epoll1_linux}, {"epollsig", grpc_init_epollsig_linux}, {"poll", grpc_init_poll_posix}, {"poll-cv", grpc_init_poll_cv_posix}, {"epollex", grpc_init_epollex_linux}, + {"none", init_non_polling}, }; static void add(const char *beg, const char *end, char ***ss, size_t *ns) { diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 4db823e003..91c7d4c38e 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3811,7 +3811,8 @@ "mac", "posix", "windows" - ] + ], + "uses_polling": true }, { "args": [], diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index 7c65067857..ea21c81875 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -286,7 +286,7 @@ class CLanguage(object): continue polling_strategies = (_POLLING_STRATEGIES.get(self.platform, ['all']) if target.get('uses_polling', True) - else ['all']) + else ['none']) if self.args.iomgr_platform == 'uv': polling_strategies = ['all'] for polling_strategy in polling_strategies: -- cgit v1.2.3 From 147a45ae8037332b15ec1e9ce9259aa8be6c915a Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 11 Oct 2017 15:19:06 -0700 Subject: Allow zero-duration polls in non-poller --- src/core/lib/iomgr/ev_posix.cc | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 369baa621f..3a1dd8d30b 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -62,18 +62,31 @@ typedef struct { } event_engine_factory; namespace { + extern "C" { -int dummypoll(struct pollfd fds[], nfds_t nfds, int timeout) { - gpr_log(GPR_ERROR, "Attempted to poll despite declaring non-polling."); - GPR_ASSERT(false); - return -1; + +grpc_poll_function_type real_poll_function; + +int dummy_poll(struct pollfd fds[], nfds_t nfds, int timeout) { + if (timeout == 0) { + return real_poll_function(fds, nfds, 0); + } else { + gpr_log(GPR_ERROR, "Attempted a blocking poll when declared non-polling."); + GPR_ASSERT(false); + return -1; + } } } // extern "C" const grpc_event_engine_vtable *init_non_polling(bool explicit_request) { + if (!explicit_request) { + return nullptr; + } // return the simplest engine as a dummy but also override the poller auto ret = grpc_init_poll_posix(explicit_request); - grpc_poll_function = dummypoll; + real_poll_function = grpc_poll_function; + grpc_poll_function = dummy_poll; + return ret; } } // namespace -- cgit v1.2.3 From d0703a65f17f34ded3556531ea06cd5c005aa0a9 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Wed, 11 Oct 2017 20:55:06 -0700 Subject: Get latency profiles back up and working --- src/core/lib/profiling/basic_timers.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src/core') diff --git a/src/core/lib/profiling/basic_timers.cc b/src/core/lib/profiling/basic_timers.cc index ab9d60481c..0ae7d7f600 100644 --- a/src/core/lib/profiling/basic_timers.cc +++ b/src/core/lib/profiling/basic_timers.cc @@ -209,9 +209,9 @@ static void init_output() { static void rotate_log() { /* Using malloc here, as this code could end up being called by gpr_malloc */ - gpr_timer_log *new = malloc(sizeof(*new)); + gpr_timer_log *log = static_cast(malloc(sizeof(*log))); gpr_once_init(&g_once_init, init_output); - new->num_entries = 0; + log->num_entries = 0; pthread_mutex_lock(&g_mu); if (g_thread_log != NULL) { timer_log_remove(&g_in_progress_logs, g_thread_log); @@ -221,9 +221,9 @@ static void rotate_log() { } else { g_thread_id = g_next_thread_id++; } - timer_log_push_back(&g_in_progress_logs, new); + timer_log_push_back(&g_in_progress_logs, log); pthread_mutex_unlock(&g_mu); - g_thread_log = new; + g_thread_log = log; } static void gpr_timers_log_add(const char *tagstr, marker_type type, -- cgit v1.2.3 From 4f0cd0e82c425533bef9f7ab8119cc542bcc9a41 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Fri, 22 Sep 2017 23:34:43 -0700 Subject: Add flow control to inproc transport so send needs a matching recv; fix some tests that assumed some sends could always go out --- src/core/ext/transport/inproc/inproc_transport.cc | 669 +++++++++------------ test/core/end2end/gen_build_yaml.py | 19 +- test/core/end2end/generate_tests.bzl | 20 +- test/core/end2end/tests/streaming_error_response.c | 1 - test/cpp/end2end/async_end2end_test.cc | 182 ++++-- tools/run_tests/generated/tests.json | 92 --- 6 files changed, 445 insertions(+), 538 deletions(-) (limited to 'src/core') diff --git a/src/core/ext/transport/inproc/inproc_transport.cc b/src/core/ext/transport/inproc/inproc_transport.cc index 1001d74c22..67a8358927 100644 --- a/src/core/ext/transport/inproc/inproc_transport.cc +++ b/src/core/ext/transport/inproc/inproc_transport.cc @@ -62,96 +62,22 @@ typedef struct inproc_transport { struct inproc_stream *stream_list; } inproc_transport; -typedef struct sb_list_entry { - grpc_slice_buffer sb; - struct sb_list_entry *next; -} sb_list_entry; - -// Specialize grpc_byte_stream for our use case -typedef struct { - grpc_byte_stream base; - sb_list_entry *le; - grpc_error *shutdown_error; -} inproc_slice_byte_stream; - -typedef struct { - // TODO (vjpai): Add some inlined elements to avoid alloc in simple cases - sb_list_entry *head; - sb_list_entry *tail; -} slice_buffer_list; - -static void slice_buffer_list_init(slice_buffer_list *l) { - l->head = NULL; - l->tail = NULL; -} - -static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) { - grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb); - gpr_free(le); -} - -static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx, - slice_buffer_list *l) { - sb_list_entry *curr = l->head; - while (curr != NULL) { - sb_list_entry *le = curr; - curr = curr->next; - sb_list_entry_destroy(exec_ctx, le); - } - l->head = NULL; - l->tail = NULL; -} - -static bool slice_buffer_list_empty(slice_buffer_list *l) { - return l->head == NULL; -} - -static void slice_buffer_list_append_entry(slice_buffer_list *l, - sb_list_entry *next) { - next->next = NULL; - if (l->tail) { - l->tail->next = next; - l->tail = next; - } else { - l->head = next; - l->tail = next; - } -} - -static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) { - sb_list_entry *next = (sb_list_entry *)gpr_malloc(sizeof(*next)); - grpc_slice_buffer_init(&next->sb); - slice_buffer_list_append_entry(l, next); - return &next->sb; -} - -static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) { - sb_list_entry *ret = l->head; - l->head = l->head->next; - if (l->head == NULL) { - l->tail = NULL; - } - return ret; -} - typedef struct inproc_stream { inproc_transport *t; grpc_metadata_batch to_read_initial_md; uint32_t to_read_initial_md_flags; bool to_read_initial_md_filled; - slice_buffer_list to_read_message; grpc_metadata_batch to_read_trailing_md; bool to_read_trailing_md_filled; - bool reads_needed; - bool read_closure_scheduled; - grpc_closure read_closure; + bool ops_needed; + bool op_closure_scheduled; + grpc_closure op_closure; // Write buffer used only during gap at init time when client-side // stream is set up but server side stream is not yet set up grpc_metadata_batch write_buffer_initial_md; bool write_buffer_initial_md_filled; uint32_t write_buffer_initial_md_flags; grpc_millis write_buffer_deadline; - slice_buffer_list write_buffer_message; grpc_metadata_batch write_buffer_trailing_md; bool write_buffer_trailing_md_filled; grpc_error *write_buffer_cancel_error; @@ -164,11 +90,15 @@ typedef struct inproc_stream { gpr_arena *arena; + grpc_transport_stream_op_batch *send_message_op; + grpc_transport_stream_op_batch *send_trailing_md_op; grpc_transport_stream_op_batch *recv_initial_md_op; grpc_transport_stream_op_batch *recv_message_op; grpc_transport_stream_op_batch *recv_trailing_md_op; - inproc_slice_byte_stream recv_message_stream; + grpc_slice_buffer recv_message; + grpc_slice_buffer_stream recv_stream; + bool recv_inited; bool initial_md_sent; bool trailing_md_sent; @@ -187,54 +117,11 @@ typedef struct inproc_stream { struct inproc_stream *stream_list_next; } inproc_stream; -static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, size_t max, - grpc_closure *on_complete) { - // Because inproc transport always provides the entire message atomically, - // the byte stream always has data available when this function is called. - // Thus, this function always returns true (unlike other transports) and - // there is never any need to schedule a closure - return true; -} - -static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, - grpc_slice *slice) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - if (stream->shutdown_error != GRPC_ERROR_NONE) { - return GRPC_ERROR_REF(stream->shutdown_error); - } - *slice = grpc_slice_buffer_take_first(&stream->le->sb); - return GRPC_ERROR_NONE; -} - -static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs, - grpc_error *error) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - GRPC_ERROR_UNREF(stream->shutdown_error); - stream->shutdown_error = error; -} - -static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *bs) { - inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; - sb_list_entry_destroy(exec_ctx, stream->le); - GRPC_ERROR_UNREF(stream->shutdown_error); -} - -static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = { - inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull, - inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy}; - -void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s, - sb_list_entry *le) { - s->base.length = (uint32_t)le->sb.length; - s->base.flags = 0; - s->base.vtable = &inproc_slice_byte_stream_vtable; - s->le = le; - s->shutdown_error = GRPC_ERROR_NONE; -} +static grpc_closure do_nothing_closure; +static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, + grpc_error *error); +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void ref_transport(inproc_transport *t) { INPROC_LOG(GPR_DEBUG, "ref_transport %p", t); @@ -280,12 +167,14 @@ static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s, static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s); - slice_buffer_list_destroy(exec_ctx, &s->to_read_message); - slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message); GRPC_ERROR_UNREF(s->write_buffer_cancel_error); GRPC_ERROR_UNREF(s->cancel_self_error); GRPC_ERROR_UNREF(s->cancel_other_error); + if (s->recv_inited) { + grpc_slice_buffer_destroy_internal(exec_ctx, &s->recv_message); + } + unref_transport(exec_ctx, s->t); if (s->closure_at_destroy) { @@ -293,9 +182,6 @@ static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { } } -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client, bool is_initial) { for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL; @@ -359,11 +245,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->write_buffer_initial_md_filled = false; grpc_metadata_batch_init(&s->write_buffer_trailing_md); s->write_buffer_trailing_md_filled = false; - slice_buffer_list_init(&s->to_read_message); - slice_buffer_list_init(&s->write_buffer_message); - s->reads_needed = false; - s->read_closure_scheduled = false; - GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s, + s->ops_needed = false; + s->op_closure_scheduled = false; + GRPC_CLOSURE_INIT(&s->op_closure, op_state_machine, s, grpc_schedule_on_exec_ctx); s->t = t; s->closure_at_destroy = NULL; @@ -425,11 +309,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md); cs->write_buffer_initial_md_filled = false; } - while (!slice_buffer_list_empty(&cs->write_buffer_message)) { - slice_buffer_list_append_entry( - &s->to_read_message, - slice_buffer_list_pophead(&cs->write_buffer_message)); - } if (cs->write_buffer_trailing_md_filled) { fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0, &s->to_read_trailing_md, NULL, @@ -488,9 +367,39 @@ static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, } } +// Call the on_complete closure associated with this stream_op_batch if +// this stream_op_batch is only one of the pending operations for this +// stream. This is called when one of the pending operations for the stream +// is done and about to be NULLed out +static void complete_if_batch_end_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *s, grpc_error *error, + grpc_transport_stream_op_batch *op, + const char *msg) { + int is_sm = (int)(op == s->send_message_op); + int is_stm = (int)(op == s->send_trailing_md_op); + int is_rim = (int)(op == s->recv_initial_md_op); + int is_rm = (int)(op == s->recv_message_op); + int is_rtm = (int)(op == s->recv_trailing_md_op); + + if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) { + INPROC_LOG(GPR_DEBUG, "%s %p %p %p", msg, s, op, error); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, GRPC_ERROR_REF(error)); + } +} + +static void maybe_schedule_op_closure_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *s, + grpc_error *error) { + if (s && s->ops_needed && !s->op_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_REF(error)); + s->op_closure_scheduled = true; + s->ops_needed = false; + } +} + static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_error *error) { - INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p fail_helper", s); // If we're failing this side, we need to make sure that // we also send or have already sent trailing metadata if (!s->trailing_md_sent) { @@ -512,14 +421,7 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(error); } - if (other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, - GRPC_ERROR_REF(error)); - other->read_closure_scheduled = true; - } - other->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, other, error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(error); } @@ -564,14 +466,9 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, err); // Last use of err so no need to REF and then UNREF it - if ((s->recv_initial_md_op != s->recv_message_op) && - (s->recv_initial_md_op != s->recv_trailing_md_op)) { - INPROC_LOG(GPR_DEBUG, - "fail_helper %p scheduling initial-metadata-on-complete %p", - error, s); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, - GRPC_ERROR_REF(error)); - } + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_initial_md_op, + "fail_helper scheduling recv-initial-metadata-on-complete"); s->recv_initial_md_op = NULL; } if (s->recv_message_op) { @@ -580,20 +477,30 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p", - s, error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(error)); - } + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_message_op, + "fail_helper scheduling recv-message-on-complete"); s->recv_message_op = NULL; } + if (s->send_message_op) { + complete_if_batch_end_locked( + exec_ctx, s, error, s->send_message_op, + "fail_helper scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + if (s->send_trailing_md_op) { + complete_if_batch_end_locked( + exec_ctx, s, error, s->send_trailing_md_op, + "fail_helper scheduling send-trailng-md-on-complete"); + s->send_trailing_md_op = NULL; + } if (s->recv_trailing_md_op) { INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling trailing-md-on-complete %p", s, error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_REF(error)); + complete_if_batch_end_locked( + exec_ctx, s, error, s->recv_trailing_md_op, + "fail_helper scheduling recv-trailing-metadata-on-complete"); s->recv_trailing_md_op = NULL; } close_other_side_locked(exec_ctx, s, "fail_helper:other_side"); @@ -602,12 +509,61 @@ static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, GRPC_ERROR_UNREF(error); } -static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void message_transfer_locked(grpc_exec_ctx *exec_ctx, + inproc_stream *sender, + inproc_stream *receiver) { + size_t remaining = + sender->send_message_op->payload->send_message.send_message->length; + if (receiver->recv_inited) { + grpc_slice_buffer_destroy_internal(exec_ctx, &receiver->recv_message); + } + grpc_slice_buffer_init(&receiver->recv_message); + receiver->recv_inited = true; + do { + grpc_slice message_slice; + grpc_closure unused; + GPR_ASSERT(grpc_byte_stream_next( + exec_ctx, sender->send_message_op->payload->send_message.send_message, + SIZE_MAX, &unused)); + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, sender->send_message_op->payload->send_message.send_message, + &message_slice); + if (error != GRPC_ERROR_NONE) { + cancel_stream_locked(exec_ctx, sender, GRPC_ERROR_REF(error)); + break; + } + GPR_ASSERT(error == GRPC_ERROR_NONE); + remaining -= GRPC_SLICE_LENGTH(message_slice); + grpc_slice_buffer_add(&receiver->recv_message, message_slice); + } while (remaining > 0); + + grpc_slice_buffer_stream_init(&receiver->recv_stream, &receiver->recv_message, + 0); + *receiver->recv_message_op->payload->recv_message.recv_message = + &receiver->recv_stream.base; + INPROC_LOG(GPR_DEBUG, "message_transfer_locked %p scheduling message-ready", + receiver); + GRPC_CLOSURE_SCHED( + exec_ctx, + receiver->recv_message_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); + complete_if_batch_end_locked( + exec_ctx, sender, GRPC_ERROR_NONE, sender->send_message_op, + "message_transfer scheduling sender on_complete"); + complete_if_batch_end_locked( + exec_ctx, receiver, GRPC_ERROR_NONE, receiver->recv_message_op, + "message_transfer scheduling receiver on_complete"); + + receiver->recv_message_op = NULL; + sender->send_message_op = NULL; +} + +static void op_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { // This function gets called when we have contents in the unprocessed reads // Get what we want based on our ops wanted // Schedule our appropriate closures - // and then return to reads_needed state if still needed + // and then return to ops_needed state if still needed // Since this is a closure directly invoked by the combiner, it should not // unref the error parameter explicitly; the combiner will do that implicitly @@ -615,12 +571,14 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, bool needs_close = false; - INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p", arg); inproc_stream *s = (inproc_stream *)arg; gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed gpr_mu_lock(mu); - s->read_closure_scheduled = false; + s->op_closure_scheduled = false; // cancellation takes precedence + inproc_stream *other = s->other_side; + if (s->cancel_self_error != GRPC_ERROR_NONE) { fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error)); goto done; @@ -632,89 +590,116 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, goto done; } - if (s->recv_initial_md_op) { - if (!s->to_read_initial_md_filled) { - // We entered the state machine on some other kind of read even though - // we still haven't satisfied initial md . That's an error. - new_err = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing"); - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for no " - "initial md %p", - s, new_err); + if (s->send_message_op && other) { + if (other->recv_message_op) { + message_transfer_locked(exec_ctx, s, other); + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + } else if (!s->t->is_client && + (s->trailing_md_sent || other->recv_trailing_md_op)) { + // A server send will never be matched if the client is waiting + // for trailing metadata already + complete_if_batch_end_locked( + exec_ctx, s, GRPC_ERROR_NONE, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + } + // Pause a send trailing metadata if there is still an outstanding + // send message unless we know that the send message will never get + // matched to a receive. This happens on the client if the server has + // already sent status. + if (s->send_trailing_md_op && + (!s->send_message_op || + (s->t->is_client && + (s->trailing_md_recvd || s->to_read_trailing_md_filled)))) { + grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + : &other->to_read_trailing_md; + bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + : &other->to_read_trailing_md_filled; + if (*destfilled || s->trailing_md_sent) { + // The buffer is already in use; that's an error! + INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); + new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; - } else if (s->initial_md_recvd) { + } else { + if (other && !other->closed) { + fill_in_metadata(exec_ctx, s, + s->send_trailing_md_op->payload->send_trailing_metadata + .send_trailing_metadata, + 0, dest, NULL, destfilled); + } + s->trailing_md_sent = true; + if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling trailing-md-on-complete", s); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_ERROR_NONE); + s->recv_trailing_md_op = NULL; + needs_close = true; + } + } + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); + complete_if_batch_end_locked( + exec_ctx, s, GRPC_ERROR_NONE, s->send_trailing_md_op, + "op_state_machine scheduling send-trailing-metadata-on-complete"); + s->send_trailing_md_op = NULL; + } + if (s->recv_initial_md_op) { + if (s->initial_md_recvd) { new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md"); INPROC_LOG( GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for already " + "op_state_machine %p scheduling on_complete errors for already " "recvd initial md %p", s, new_err); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); goto done; } - s->initial_md_recvd = true; - new_err = fill_in_metadata( - exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, - s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata, - s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL); - s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata - ->deadline = s->deadline; - grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); - s->to_read_initial_md_filled = false; - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling initial-metadata-ready %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, - s->recv_initial_md_op->payload->recv_initial_metadata - .recv_initial_metadata_ready, - GRPC_ERROR_REF(new_err)); - if ((s->recv_initial_md_op != s->recv_message_op) && - (s->recv_initial_md_op != s->recv_trailing_md_op)) { - INPROC_LOG( - GPR_DEBUG, - "read_state_machine %p scheduling initial-metadata-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, - GRPC_ERROR_REF(new_err)); - } - s->recv_initial_md_op = NULL; - - if (new_err != GRPC_ERROR_NONE) { + if (s->to_read_initial_md_filled) { + s->initial_md_recvd = true; + new_err = fill_in_metadata( + exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata, + s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, + NULL); + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata->deadline = s->deadline; + grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); + s->to_read_initial_md_filled = false; INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors2 %p", s, + "op_state_machine %p scheduling initial-metadata-ready %p", s, new_err); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); - goto done; + GRPC_CLOSURE_SCHED(exec_ctx, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata_ready, + GRPC_ERROR_REF(new_err)); + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_initial_md_op, + "op_state_machine scheduling recv-initial-metadata-on-complete"); + s->recv_initial_md_op = NULL; + + if (new_err != GRPC_ERROR_NONE) { + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling on_complete errors2 %p", s, + new_err); + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + goto done; + } } } - if (s->to_read_initial_md_filled) { - new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame"); - fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); - goto done; - } - if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) { - inproc_slice_byte_stream_init( - &s->recv_message_stream, - slice_buffer_list_pophead(&s->to_read_message)); - *s->recv_message_op->payload->recv_message.recv_message = - &s->recv_message_stream.base; - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); - GRPC_CLOSURE_SCHED( - exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, - GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); + if (s->recv_message_op) { + if (other && other->send_message_op) { + message_transfer_locked(exec_ctx, other, s); + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); } - s->recv_message_op = NULL; + } + if (s->recv_trailing_md_op && s->t->is_client && other && + other->send_message_op) { + maybe_schedule_op_closure_locked(exec_ctx, other, GRPC_ERROR_NONE); } if (s->to_read_trailing_md_filled) { if (s->trailing_md_recvd) { @@ -722,7 +707,7 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md"); INPROC_LOG( GPR_DEBUG, - "read_state_machine %p scheduling on_complete errors for already " + "op_state_machine %p scheduling on_complete errors for already " "recvd trailing md %p", s, new_err); fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); @@ -731,21 +716,24 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, if (s->recv_message_op != NULL) { // This message needs to be wrapped up because it will never be // satisfied - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", - s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); - } + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_message_op, + "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = NULL; } + if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) { + // Nothing further will try to receive from this stream, so finish off + // any outstanding send_message op + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } if (s->recv_trailing_md_op != NULL) { // We wanted trailing metadata and we got it s->trailing_md_recvd = true; @@ -763,61 +751,65 @@ static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, // (If the server hasn't already sent its trailing md, it doesn't have // a final status, so don't mark this op complete) if (s->t->is_client || s->trailing_md_sent) { - INPROC_LOG( - GPR_DEBUG, - "read_state_machine %p scheduling trailing-md-on-complete %p", s, - new_err); + INPROC_LOG(GPR_DEBUG, + "op_state_machine %p scheduling trailing-md-on-complete %p", + s, new_err); GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, GRPC_ERROR_REF(new_err)); s->recv_trailing_md_op = NULL; needs_close = true; } else { INPROC_LOG(GPR_DEBUG, - "read_state_machine %p server needs to delay handling " + "op_state_machine %p server needs to delay handling " "trailing-md-on-complete %p", s, new_err); } } else { INPROC_LOG( GPR_DEBUG, - "read_state_machine %p has trailing md but not yet waiting for it", - s); + "op_state_machine %p has trailing md but not yet waiting for it", s); } } if (s->trailing_md_recvd && s->recv_message_op) { // No further message will come on this stream, so finish off the // recv_message_op - INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); + INPROC_LOG(GPR_DEBUG, "op_state_machine %p scheduling message-ready", s); GRPC_CLOSURE_SCHED( exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, GRPC_ERROR_NONE); - if (s->recv_message_op != s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "read_state_machine %p scheduling message-on-complete %p", s, - new_err); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, - GRPC_ERROR_REF(new_err)); - } + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->recv_message_op, + "op_state_machine scheduling recv-message-on-complete"); s->recv_message_op = NULL; } - if (s->recv_message_op || s->recv_trailing_md_op) { + if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) && + s->send_message_op) { + // Nothing further will try to receive from this stream, so finish off + // any outstanding send_message op + complete_if_batch_end_locked( + exec_ctx, s, new_err, s->send_message_op, + "op_state_machine scheduling send-message-on-complete"); + s->send_message_op = NULL; + } + if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op || + s->recv_message_op || s->recv_trailing_md_op) { // Didn't get the item we wanted so we still need to get // rescheduled - INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s, - s->recv_message_op, s->recv_trailing_md_op); - s->reads_needed = true; + INPROC_LOG( + GPR_DEBUG, "op_state_machine %p still needs closure %p %p %p %p %p", s, + s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op, + s->recv_message_op, s->recv_trailing_md_op); + s->ops_needed = true; } done: if (needs_close) { - close_other_side_locked(exec_ctx, s, "read_state_machine"); + close_other_side_locked(exec_ctx, s, "op_state_machine"); close_stream_locked(exec_ctx, s); } gpr_mu_unlock(mu); GRPC_ERROR_UNREF(new_err); } -static grpc_closure do_nothing_closure; - static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, grpc_error *error) { bool ret = false; // was the cancel accepted @@ -826,14 +818,7 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (s->cancel_self_error == GRPC_ERROR_NONE) { ret = true; s->cancel_self_error = GRPC_ERROR_REF(error); - if (s->reads_needed) { - if (!s->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, - GRPC_ERROR_REF(s->cancel_self_error)); - s->read_closure_scheduled = true; - } - s->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, s, s->cancel_self_error); // Send trailing md to the other side indicating cancellation, even if we // already have s->trailing_md_sent = true; @@ -853,14 +838,8 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, if (other->cancel_other_error == GRPC_ERROR_NONE) { other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error); } - if (other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, - GRPC_ERROR_REF(other->cancel_other_error)); - other->read_closure_scheduled = true; - } - other->reads_needed = false; - } + maybe_schedule_op_closure_locked(exec_ctx, other, + other->cancel_other_error); } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error); } @@ -869,11 +848,9 @@ static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, // couldn't complete that because we hadn't yet sent out trailing // md, now's the chance if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "cancel_stream %p scheduling trailing-md-on-complete %p", s, - s->cancel_self_error); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_REF(s->cancel_self_error)); + complete_if_batch_end_locked( + exec_ctx, s, s->cancel_self_error, s->recv_trailing_md_op, + "cancel_stream scheduling trailing-md-on-complete"); s->recv_trailing_md_op = NULL; } } @@ -918,7 +895,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, // already self-canceled so still give it an error error = GRPC_ERROR_REF(s->cancel_self_error); } else { - INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s, + INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %s%s%s%s%s%s%s", s, + s->t->is_client ? "client" : "server", op->send_initial_metadata ? " send_initial_metadata" : "", op->send_message ? " send_message" : "", op->send_trailing_metadata ? " send_trailing_metadata" : "", @@ -929,10 +907,9 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, bool needs_close = false; + inproc_stream *other = s->other_side; if (error == GRPC_ERROR_NONE && - (op->send_initial_metadata || op->send_message || - op->send_trailing_metadata)) { - inproc_stream *other = s->other_side; + (op->send_initial_metadata || op->send_trailing_metadata)) { if (s->t->is_closed) { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); } @@ -963,72 +940,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->initial_md_sent = true; } } - } - if (error == GRPC_ERROR_NONE && op->send_message) { - size_t remaining = op->payload->send_message.send_message->length; - grpc_slice_buffer *dest = slice_buffer_list_append( - (other == NULL) ? &s->write_buffer_message : &other->to_read_message); - do { - grpc_slice message_slice; - grpc_closure unused; - GPR_ASSERT(grpc_byte_stream_next(exec_ctx, - op->payload->send_message.send_message, - SIZE_MAX, &unused)); - error = grpc_byte_stream_pull( - exec_ctx, op->payload->send_message.send_message, &message_slice); - if (error != GRPC_ERROR_NONE) { - cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error)); - break; - } - GPR_ASSERT(error == GRPC_ERROR_NONE); - remaining -= GRPC_SLICE_LENGTH(message_slice); - grpc_slice_buffer_add(dest, message_slice); - } while (remaining != 0); - grpc_byte_stream_destroy(exec_ctx, - op->payload->send_message.send_message); - } - if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) { - grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md - : &other->to_read_trailing_md; - bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled - : &other->to_read_trailing_md_filled; - if (*destfilled || s->trailing_md_sent) { - // The buffer is already in use; that's an error! - INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); - } else { - if (!other->closed) { - fill_in_metadata( - exec_ctx, s, - op->payload->send_trailing_metadata.send_trailing_metadata, 0, - dest, NULL, destfilled); - } - s->trailing_md_sent = true; - if (!s->t->is_client && s->trailing_md_recvd && - s->recv_trailing_md_op) { - INPROC_LOG(GPR_DEBUG, - "perform_stream_op %p scheduling trailing-md-on-complete", - s); - GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, - GRPC_ERROR_NONE); - s->recv_trailing_md_op = NULL; - needs_close = true; - } - } - } - if (other != NULL && other->reads_needed) { - if (!other->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error); - other->read_closure_scheduled = true; - } - other->reads_needed = false; + maybe_schedule_op_closure_locked(exec_ctx, other, error); } } + if (error == GRPC_ERROR_NONE && - (op->recv_initial_metadata || op->recv_message || + (op->send_message || op->send_trailing_metadata || + op->recv_initial_metadata || op->recv_message || op->recv_trailing_metadata)) { - // If there are any reads, mark it so that the read closure will react to - // them + // Mark ops that need to be processed by the closure + if (op->send_message) { + s->send_message_op = op; + } + if (op->send_trailing_metadata) { + s->send_trailing_md_op = op; + } if (op->recv_initial_metadata) { s->recv_initial_md_op = op; } @@ -1040,25 +966,28 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } // We want to initiate the closure if: - // 1. There is initial metadata and something ready to take that - // 2. There is a message and something ready to take it - // 3. There is trailing metadata, even if nothing specifically wants - // that because that can shut down the message as well - if ((s->to_read_initial_md_filled && op->recv_initial_metadata) || - ((!slice_buffer_list_empty(&s->to_read_message) || - s->trailing_md_recvd) && - op->recv_message) || - (s->to_read_trailing_md_filled)) { - if (!s->read_closure_scheduled) { - GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE); - s->read_closure_scheduled = true; + // 1. We want to send a message and the other side wants to receive or end + // 2. We want to send trailing metadata and there isn't an unmatched send + // 3. We want initial metadata and the other side has sent it + // 4. We want to receive a message and there is a message ready + // 5. There is trailing metadata, even if nothing specifically wants + // that because that can shut down the receive message as well + if ((op->send_message && other && ((other->recv_message_op != NULL) || + (other->recv_trailing_md_op != NULL))) || + (op->send_trailing_metadata && !op->send_message) || + (op->recv_initial_metadata && s->to_read_initial_md_filled) || + (op->recv_message && (other && other->send_message_op != NULL)) || + (s->to_read_trailing_md_filled || s->trailing_md_recvd)) { + if (!s->op_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->op_closure, GRPC_ERROR_NONE); + s->op_closure_scheduled = true; } } else { - s->reads_needed = true; + s->ops_needed = true; } } else { if (error != GRPC_ERROR_NONE) { - // Schedule op's read closures that we didn't push to read state machine + // Schedule op's closures that we didn't push to op state machine if (op->recv_initial_metadata) { INPROC_LOG( GPR_DEBUG, diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py index e1dc69994c..f7f996d5c1 100755 --- a/test/core/end2end/gen_build_yaml.py +++ b/test/core/end2end/gen_build_yaml.py @@ -24,15 +24,15 @@ import hashlib FixtureOptions = collections.namedtuple( 'FixtureOptions', - 'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2 supports_proxy_auth') + 'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2 supports_proxy_auth supports_write_buffering') default_unsecure_fixture_options = FixtureOptions( - True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True, False) + True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True, False, True) socketpair_unsecure_fixture_options = default_unsecure_fixture_options._replace(fullstack=False, dns_resolver=False) default_secure_fixture_options = default_unsecure_fixture_options._replace(secure=True) uds_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, platforms=['linux', 'mac', 'posix'], exclude_iomgrs=['uv']) fd_unsecure_fixture_options = default_unsecure_fixture_options._replace( dns_resolver=False, fullstack=False, platforms=['linux', 'mac', 'posix'], exclude_iomgrs=['uv']) -inproc_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, fullstack=False, name_resolution=False, supports_compression=False, is_inproc=True, is_http2=False) +inproc_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, fullstack=False, name_resolution=False, supports_compression=False, is_inproc=True, is_http2=False, supports_write_buffering=False) # maps fixture name to whether it requires the security library END2END_FIXTURES = { @@ -68,8 +68,8 @@ END2END_FIXTURES = { TestOptions = collections.namedtuple( 'TestOptions', - 'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2 needs_proxy_auth') -default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False, False) + 'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2 needs_proxy_auth needs_write_buffering') +default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False, False, False) connectivity_test_options = default_test_options._replace(needs_fullstack=True) LOWCPU = 0.1 @@ -146,8 +146,10 @@ END2END_TESTS = { 'streaming_error_response': default_test_options._replace(cpu_cost=LOWCPU), 'trailing_metadata': default_test_options, 'workaround_cronet_compression': default_test_options, - 'write_buffering': default_test_options._replace(cpu_cost=LOWCPU), - 'write_buffering_at_end': default_test_options._replace(cpu_cost=LOWCPU), + 'write_buffering': default_test_options._replace(cpu_cost=LOWCPU, + needs_write_buffering=True), + 'write_buffering_at_end': default_test_options._replace(cpu_cost=LOWCPU, + needs_write_buffering=True), } @@ -185,6 +187,9 @@ def compatible(f, t): if END2END_TESTS[t].needs_proxy_auth: if not END2END_FIXTURES[f].supports_proxy_auth: return False + if END2END_TESTS[t].needs_write_buffering: + if not END2END_FIXTURES[f].supports_write_buffering: + return False return True diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl index d48ddb4606..89a95edfd7 100755 --- a/test/core/end2end/generate_tests.bzl +++ b/test/core/end2end/generate_tests.bzl @@ -21,7 +21,8 @@ load("//bazel:grpc_build_system.bzl", "grpc_sh_test", "grpc_cc_binary", "grpc_cc def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True, name_resolution=True, secure=True, tracing=False, platforms=['windows', 'linux', 'mac', 'posix'], - is_inproc=False, is_http2=True, supports_proxy_auth=False): + is_inproc=False, is_http2=True, supports_proxy_auth=False, + supports_write_buffering=True): return struct( fullstack=fullstack, includes_proxy=includes_proxy, @@ -31,7 +32,8 @@ def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True, tracing=tracing, is_inproc=is_inproc, is_http2=is_http2, - supports_proxy_auth=supports_proxy_auth + supports_proxy_auth=supports_proxy_auth, + supports_write_buffering=supports_write_buffering #platforms=platforms ) @@ -61,14 +63,14 @@ END2END_FIXTURES = { platforms=['linux', 'mac', 'posix']), 'inproc': fixture_options(fullstack=False, dns_resolver=False, name_resolution=False, is_inproc=True, - is_http2=False), + is_http2=False, supports_write_buffering=False), } def test_options(needs_fullstack=False, needs_dns=False, needs_names=False, proxyable=True, secure=False, traceable=False, exclude_inproc=False, needs_http2=False, - needs_proxy_auth=False): + needs_proxy_auth=False, needs_write_buffering=False): return struct( needs_fullstack=needs_fullstack, needs_dns=needs_dns, @@ -78,7 +80,8 @@ def test_options(needs_fullstack=False, needs_dns=False, needs_names=False, traceable=traceable, exclude_inproc=exclude_inproc, needs_http2=needs_http2, - needs_proxy_auth=needs_proxy_auth + needs_proxy_auth=needs_proxy_auth, + needs_write_buffering=needs_write_buffering ) @@ -144,8 +147,8 @@ END2END_TESTS = { 'authority_not_supported': test_options(), 'filter_latency': test_options(), 'workaround_cronet_compression': test_options(), - 'write_buffering': test_options(), - 'write_buffering_at_end': test_options(), + 'write_buffering': test_options(needs_write_buffering=True), + 'write_buffering_at_end': test_options(needs_write_buffering=True), } @@ -174,6 +177,9 @@ def compatible(fopt, topt): if topt.needs_proxy_auth: if not fopt.supports_proxy_auth: return False + if topt.needs_write_buffering: + if not fopt.supports_write_buffering: + return False return True diff --git a/test/core/end2end/tests/streaming_error_response.c b/test/core/end2end/tests/streaming_error_response.c index 9d562b9090..8891b8674c 100644 --- a/test/core/end2end/tests/streaming_error_response.c +++ b/test/core/end2end/tests/streaming_error_response.c @@ -183,7 +183,6 @@ static void test(grpc_end2end_test_config config, bool request_status_early) { GPR_ASSERT(GRPC_CALL_OK == error); CQ_EXPECT_COMPLETION(cqv, tag(103), 1); - cq_verify(cqv); if (!request_status_early) { memset(ops, 0, sizeof(ops)); diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc index a14b4d5295..2a33e8ae11 100644 --- a/test/cpp/end2end/async_end2end_test.cc +++ b/test/cpp/end2end/async_end2end_test.cc @@ -1304,7 +1304,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { ServerTryCancelRequestPhase server_try_cancel) { ResetStub(); - EchoRequest send_request; EchoRequest recv_request; EchoResponse send_response; EchoResponse recv_response; @@ -1315,31 +1314,24 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { ServerAsyncReader srv_stream(&srv_ctx); // Initiate the 'RequestStream' call on client + CompletionQueue cli_cq; + std::unique_ptr> cli_stream( - stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1))); - Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); + stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1))); // On the server, request to be notified of 'RequestStream' calls // and receive the 'RequestStream' call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); + std::thread t1([this, &cli_cq] { + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq); + }); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); - - // Client sends 3 messages (tags 3, 4 and 5) - for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { - send_request.set_message("Ping " + grpc::to_string(tag_idx)); - cli_stream->Write(send_request, tag(tag_idx)); - Verifier(GetParam().disable_blocking) - .Expect(tag_idx, true) - .Verify(cq_.get()); - } - cli_stream->WritesDone(tag(6)); - Verifier(GetParam().disable_blocking).Expect(6, true).Verify(cq_.get()); + t1.join(); bool expected_server_cq_result = true; - bool ignore_cq_result = false; - bool want_done_tag = false; + bool expected_client_cq_result = true; if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); @@ -1347,10 +1339,36 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { EXPECT_TRUE(srv_ctx.IsCancelled()); // Since cancellation is done before server reads any results, we know - // for sure that all cq results will return false from this point forward + // for sure that all server cq results will return false from this + // point forward expected_server_cq_result = false; + expected_client_cq_result = false; } + bool ignore_client_cq_result = + (server_try_cancel == CANCEL_DURING_PROCESSING) || + (server_try_cancel == CANCEL_BEFORE_PROCESSING); + + std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result, + &ignore_client_cq_result, this] { + EchoRequest send_request; + // Client sends 3 messages (tags 3, 4 and 5) + for (int tag_idx = 3; tag_idx <= 5; tag_idx++) { + send_request.set_message("Ping " + grpc::to_string(tag_idx)); + cli_stream->Write(send_request, tag(tag_idx)); + Verifier(GetParam().disable_blocking) + .Expect(tag_idx, expected_client_cq_result) + .Verify(&cli_cq, ignore_client_cq_result); + } + cli_stream->WritesDone(tag(6)); + // Ignore ok on WritesDone since cancel can affect it + Verifier(GetParam().disable_blocking) + .Expect(6, expected_client_cq_result) + .Verify(&cli_cq, ignore_client_cq_result); + }); + + bool ignore_cq_result = false; + bool want_done_tag = false; std::thread* server_try_cancel_thd = nullptr; auto verif = Verifier(GetParam().disable_blocking); @@ -1387,6 +1405,8 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { } } + cli_thread.join(); + if (server_try_cancel_thd != nullptr) { server_try_cancel_thd->join(); delete server_try_cancel_thd; @@ -1415,9 +1435,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); + + cli_cq.Shutdown(); + void* dummy_tag; + bool dummy_ok; + while (cli_cq.Next(&dummy_tag, &dummy_ok)) { + } } // Helper for testing server-streaming RPCs which are cancelled on the server. @@ -1439,7 +1465,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { EchoRequest send_request; EchoRequest recv_request; EchoResponse send_response; - EchoResponse recv_response; Status recv_status; ClientContext cli_ctx; ServerContext srv_ctx; @@ -1447,20 +1472,29 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { send_request.set_message("Ping"); // Initiate the 'ResponseStream' call on the client + CompletionQueue cli_cq; std::unique_ptr> cli_stream( - stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1))); - Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); + stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1))); // On the server, request to be notified of 'ResponseStream' calls and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream, cq_.get(), cq_.get(), tag(2)); + + std::thread t1([this, &cli_cq] { + Verifier(GetParam().disable_blocking).Expect(1, true).Verify(&cli_cq); + }); Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + t1.join(); + EXPECT_EQ(send_request.message(), recv_request.message()); bool expected_cq_result = true; bool ignore_cq_result = false; bool want_done_tag = false; + bool expected_client_cq_result = true; + bool ignore_client_cq_result = + (server_try_cancel != CANCEL_BEFORE_PROCESSING); if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); @@ -1470,8 +1504,21 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // We know for sure that all cq results will be false from this point // since the server cancelled the RPC expected_cq_result = false; + expected_client_cq_result = false; } + std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result, + &ignore_client_cq_result, this] { + // Client attempts to read the three messages from the server + for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { + EchoResponse recv_response; + cli_stream->Read(&recv_response, tag(tag_idx)); + Verifier(GetParam().disable_blocking) + .Expect(tag_idx, expected_client_cq_result) + .Verify(&cli_cq, ignore_client_cq_result); + } + }); + std::thread* server_try_cancel_thd = nullptr; auto verif = Verifier(GetParam().disable_blocking); @@ -1519,10 +1566,6 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { srv_ctx.TryCancel(); want_done_tag = true; verif.Expect(11, true); - - // Client reads may fail bacause it is notified that the stream is - // cancelled. - ignore_cq_result = true; } if (want_done_tag) { @@ -1531,13 +1574,7 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { want_done_tag = false; } - // Client attemts to read the three messages from the server - for (int tag_idx = 6; tag_idx <= 8; tag_idx++) { - cli_stream->Read(&recv_response, tag(tag_idx)); - Verifier(GetParam().disable_blocking) - .Expect(tag_idx, expected_cq_result) - .Verify(cq_.get(), ignore_cq_result); - } + cli_thread.join(); // The RPC has been cancelled at this point for sure (i.e irrespective of // the value of `server_try_cancel` is). So, from this point forward, we @@ -1549,9 +1586,15 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Client will see the cancellation cli_stream->Finish(&recv_status, tag(10)); - Verifier(GetParam().disable_blocking).Expect(10, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking).Expect(10, true).Verify(&cli_cq); EXPECT_FALSE(recv_status.ok()); EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code()); + + cli_cq.Shutdown(); + void* dummy_tag; + bool dummy_ok; + while (cli_cq.Next(&dummy_tag, &dummy_ok)) { + } } // Helper for testing bidirectinal-streaming RPCs which are cancelled on the @@ -1584,38 +1627,52 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { // Initiate the call from the client side std::unique_ptr> cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1))); - Verifier(GetParam().disable_blocking).Expect(1, true).Verify(cq_.get()); // On the server, request to be notified of the 'BidiStream' call and // receive the call just made by the client srv_ctx.AsyncNotifyWhenDone(tag(11)); service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(), tag(2)); - Verifier(GetParam().disable_blocking).Expect(2, true).Verify(cq_.get()); + Verifier(GetParam().disable_blocking) + .Expect(1, true) + .Expect(2, true) + .Verify(cq_.get()); + + auto verif = Verifier(GetParam().disable_blocking); // Client sends the first and the only message send_request.set_message("Ping"); cli_stream->Write(send_request, tag(3)); - Verifier(GetParam().disable_blocking).Expect(3, true).Verify(cq_.get()); + verif.Expect(3, true); bool expected_cq_result = true; bool ignore_cq_result = false; bool want_done_tag = false; + int got_tag, got_tag2; + bool tag_3_done = false; + if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { srv_ctx.TryCancel(); - Verifier(GetParam().disable_blocking).Expect(11, true).Verify(cq_.get()); - EXPECT_TRUE(srv_ctx.IsCancelled()); - - // We know for sure that all cq results will be false from this point - // since the server cancelled the RPC + verif.Expect(11, true); + // We know for sure that all server cq results will be false from + // this point since the server cancelled the RPC. However, we can't + // say for sure about the client expected_cq_result = false; + ignore_cq_result = true; + + do { + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11)); + if (got_tag == 3) { + tag_3_done = true; + } + } while (got_tag != 11); + EXPECT_TRUE(srv_ctx.IsCancelled()); } std::thread* server_try_cancel_thd = nullptr; - auto verif = Verifier(GetParam().disable_blocking); - if (server_try_cancel == CANCEL_DURING_PROCESSING) { server_try_cancel_thd = new std::thread(&ServerContext::TryCancel, &srv_ctx); @@ -1630,39 +1687,42 @@ class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest { verif.Expect(11, true); } - int got_tag; srv_stream.Read(&recv_request, tag(4)); verif.Expect(4, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 4) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { + got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result); + got_tag2 = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 3) || (got_tag == 4) || + (got_tag == 11 && want_done_tag)); + GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) || + (got_tag2 == 11 && want_done_tag)); + // If we get 3 and 4, we don't need to wait for 11, but if + // we get 11, we should also clear 3 and 4 + if (got_tag + got_tag2 != 7) { EXPECT_TRUE(srv_ctx.IsCancelled()); want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 4); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 3) || (got_tag == 4)); } send_response.set_message("Pong"); srv_stream.Write(send_response, tag(5)); verif.Expect(5, expected_cq_result); - got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 5) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { - EXPECT_TRUE(srv_ctx.IsCancelled()); - want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 5); - } cli_stream->Read(&recv_response, tag(6)); verif.Expect(6, expected_cq_result); got_tag = verif.Next(cq_.get(), ignore_cq_result); - GPR_ASSERT((got_tag == 6) || (got_tag == 11 && want_done_tag)); - if (got_tag == 11) { + got_tag2 = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 5) || (got_tag == 6) || + (got_tag == 11 && want_done_tag)); + GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) || + (got_tag2 == 11 && want_done_tag)); + // If we get 5 and 6, we don't need to wait for 11, but if + // we get 11, we should also clear 5 and 6 + if (got_tag + got_tag2 != 11) { EXPECT_TRUE(srv_ctx.IsCancelled()); want_done_tag = false; - // Now get the other entry that we were waiting on - EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 6); + got_tag = verif.Next(cq_.get(), ignore_cq_result); + GPR_ASSERT((got_tag == 5) || (got_tag == 6)); } // This is expected to succeed in all cases diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 91c7d4c38e..a1644dfa09 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -29976,52 +29976,6 @@ "posix" ] }, - { - "args": [ - "write_buffering" - ], - "ci_platforms": [ - "windows", - "linux", - "mac", - "posix" - ], - "cpu_cost": 0.1, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "language": "c", - "name": "inproc_test", - "platforms": [ - "windows", - "linux", - "mac", - "posix" - ] - }, - { - "args": [ - "write_buffering_at_end" - ], - "ci_platforms": [ - "windows", - "linux", - "mac", - "posix" - ], - "cpu_cost": 0.1, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "language": "c", - "name": "inproc_test", - "platforms": [ - "windows", - "linux", - "mac", - "posix" - ] - }, { "args": [ "authority_not_supported" @@ -48359,52 +48313,6 @@ "posix" ] }, - { - "args": [ - "write_buffering" - ], - "ci_platforms": [ - "windows", - "linux", - "mac", - "posix" - ], - "cpu_cost": 0.1, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "language": "c", - "name": "inproc_nosec_test", - "platforms": [ - "windows", - "linux", - "mac", - "posix" - ] - }, - { - "args": [ - "write_buffering_at_end" - ], - "ci_platforms": [ - "windows", - "linux", - "mac", - "posix" - ], - "cpu_cost": 0.1, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "language": "c", - "name": "inproc_nosec_test", - "platforms": [ - "windows", - "linux", - "mac", - "posix" - ] - }, { "args": [ "--scenarios_json", -- cgit v1.2.3 From fa7ae246ca4779b9351792f4f1ac3e5852ff4844 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 12 Oct 2017 09:37:57 -0700 Subject: Add call combiner stats --- src/core/lib/debug/stats_data.cc | 5 +++++ src/core/lib/debug/stats_data.h | 8 ++++++++ src/core/lib/debug/stats_data.yaml | 7 ++++++- src/core/lib/debug/stats_data_bq_schema.sql | 2 ++ src/core/lib/iomgr/call_combiner.cc | 3 +++ tools/run_tests/performance/massage_qps_stats.py | 2 ++ .../performance/scenario_result_schema.json | 20 ++++++++++++++++++++ 7 files changed, 46 insertions(+), 1 deletion(-) (limited to 'src/core') diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc index b4ae8f312c..2271c87e56 100644 --- a/src/core/lib/debug/stats_data.cc +++ b/src/core/lib/debug/stats_data.cc @@ -104,6 +104,8 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "combiner_locks_scheduled_items", "combiner_locks_scheduled_final_items", "combiner_locks_offloaded", + "call_combiner_locks_initiated", + "call_combiner_locks_scheduled_items", "executor_scheduled_short_items", "executor_scheduled_long_items", "executor_scheduled_to_self", @@ -213,6 +215,9 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of items scheduled against combiner locks", "Number of final items scheduled against combiner locks", "Number of combiner locks offloaded to different threads", + "Number of call combiner lock entries by process (first items queued to a " + "call combiner)", + "Number of items scheduled against call combiner locks", "Number of finite runtime closures scheduled against the executor (gRPC " "thread pool)", "Number of potentially infinite runtime closures scheduled against the " diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index d4c4437ca0..bfa5c1a61b 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -110,6 +110,8 @@ typedef enum { GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_ITEMS, GRPC_STATS_COUNTER_COMBINER_LOCKS_SCHEDULED_FINAL_ITEMS, GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED, + GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED, + GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF, @@ -407,6 +409,12 @@ typedef enum { #define GRPC_STATS_INC_COMBINER_LOCKS_OFFLOADED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED) +#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED) +#define GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS) #define GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS) diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 00e81e74e2..d12c8df663 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -245,6 +245,12 @@ doc: Number of final items scheduled against combiner locks - counter: combiner_locks_offloaded doc: Number of combiner locks offloaded to different threads +# call combiner locks +- counter: call_combiner_locks_initiated + doc: Number of call combiner lock entries by process + (first items queued to a call combiner) +- counter: call_combiner_locks_scheduled_items + doc: Number of items scheduled against call combiner locks # executor - counter: executor_scheduled_short_items doc: Number of finite runtime closures scheduled against the executor @@ -282,4 +288,3 @@ - counter: cq_ev_queue_transient_pop_failures doc: Number of times NULL was popped out of completion queue's event queue even though the event queue was not empty - diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index f34b0c25a0..5e541b9181 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -79,6 +79,8 @@ combiner_locks_initiated_per_iteration:FLOAT, combiner_locks_scheduled_items_per_iteration:FLOAT, combiner_locks_scheduled_final_items_per_iteration:FLOAT, combiner_locks_offloaded_per_iteration:FLOAT, +call_combiner_locks_initiated_per_iteration:FLOAT, +call_combiner_locks_scheduled_items_per_iteration:FLOAT, executor_scheduled_short_items_per_iteration:FLOAT, executor_scheduled_long_items_per_iteration:FLOAT, executor_scheduled_to_self_per_iteration:FLOAT, diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc index bab3df021a..8b08d307a1 100644 --- a/src/core/lib/iomgr/call_combiner.cc +++ b/src/core/lib/iomgr/call_combiner.cc @@ -21,6 +21,7 @@ #include #include +#include "src/core/lib/debug/stats.h" grpc_tracer_flag grpc_call_combiner_trace = GRPC_TRACER_INITIALIZER(false, "call_combiner"); @@ -73,7 +74,9 @@ void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, prev_size + 1); } + GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx); if (prev_size == 0) { + GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED(exec_ctx); if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY"); } diff --git a/tools/run_tests/performance/massage_qps_stats.py b/tools/run_tests/performance/massage_qps_stats.py index e0b6ce6ba6..eadd17db8e 100644 --- a/tools/run_tests/performance/massage_qps_stats.py +++ b/tools/run_tests/performance/massage_qps_stats.py @@ -101,6 +101,8 @@ def massage_qps_stats(scenario_result): stats["core_combiner_locks_scheduled_items"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_scheduled_items") stats["core_combiner_locks_scheduled_final_items"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_scheduled_final_items") stats["core_combiner_locks_offloaded"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_offloaded") + stats["core_call_combiner_locks_initiated"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_locks_initiated") + stats["core_call_combiner_locks_scheduled_items"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_locks_scheduled_items") stats["core_executor_scheduled_short_items"] = massage_qps_stats_helpers.counter(core_stats, "executor_scheduled_short_items") stats["core_executor_scheduled_long_items"] = massage_qps_stats_helpers.counter(core_stats, "executor_scheduled_long_items") stats["core_executor_scheduled_to_self"] = massage_qps_stats_helpers.counter(core_stats, "executor_scheduled_to_self") diff --git a/tools/run_tests/performance/scenario_result_schema.json b/tools/run_tests/performance/scenario_result_schema.json index f11e6359f6..1d726539b9 100644 --- a/tools/run_tests/performance/scenario_result_schema.json +++ b/tools/run_tests/performance/scenario_result_schema.json @@ -515,6 +515,16 @@ "name": "core_combiner_locks_offloaded", "type": "INTEGER" }, + { + "mode": "NULLABLE", + "name": "core_call_combiner_locks_initiated", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_call_combiner_locks_scheduled_items", + "type": "INTEGER" + }, { "mode": "NULLABLE", "name": "core_executor_scheduled_short_items", @@ -1327,6 +1337,16 @@ "name": "core_combiner_locks_offloaded", "type": "INTEGER" }, + { + "mode": "NULLABLE", + "name": "core_call_combiner_locks_initiated", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_call_combiner_locks_scheduled_items", + "type": "INTEGER" + }, { "mode": "NULLABLE", "name": "core_executor_scheduled_short_items", -- cgit v1.2.3 From 2638cd9ff5e9f99a436f4369db46c904fae78d43 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 12 Oct 2017 09:41:55 -0700 Subject: Track where combiners are initiated in latency traces --- src/core/lib/iomgr/combiner.cc | 1 + 1 file changed, 1 insertion(+) (limited to 'src/core') diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 0e707ef839..53f4b7eaa7 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -165,6 +165,7 @@ static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_closure *cl, lock, cl, last)); if (last == 1) { GRPC_STATS_INC_COMBINER_LOCKS_INITIATED(exec_ctx); + GPR_TIMER_MARK("combiner.initiated", 0); gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, (gpr_atm)exec_ctx); // first element on this list: add it to the list of combiner locks -- cgit v1.2.3 From 7befe5d8e568da23edab19b02b953a33dd973e9f Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Thu, 12 Oct 2017 09:53:02 -0700 Subject: Add counters --- src/core/lib/debug/stats_data.cc | 4 ++++ src/core/lib/debug/stats_data.h | 7 +++++++ src/core/lib/debug/stats_data.yaml | 4 ++++ src/core/lib/debug/stats_data_bq_schema.sql | 2 ++ src/core/lib/iomgr/call_combiner.cc | 2 ++ tools/run_tests/performance/massage_qps_stats.py | 2 ++ .../performance/scenario_result_schema.json | 20 ++++++++++++++++++++ 7 files changed, 41 insertions(+) (limited to 'src/core') diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc index 2271c87e56..5d737c56cb 100644 --- a/src/core/lib/debug/stats_data.cc +++ b/src/core/lib/debug/stats_data.cc @@ -106,6 +106,8 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "combiner_locks_offloaded", "call_combiner_locks_initiated", "call_combiner_locks_scheduled_items", + "call_combiner_set_notify_on_cancel", + "call_combiner_cancelled", "executor_scheduled_short_items", "executor_scheduled_long_items", "executor_scheduled_to_self", @@ -218,6 +220,8 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of call combiner lock entries by process (first items queued to a " "call combiner)", "Number of items scheduled against call combiner locks", + "Number of times a cancellation callback was set on a call combiner", + "Number of times a call combiner was cancelled", "Number of finite runtime closures scheduled against the executor (gRPC " "thread pool)", "Number of potentially infinite runtime closures scheduled against the " diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index bfa5c1a61b..031942df5c 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -112,6 +112,8 @@ typedef enum { GRPC_STATS_COUNTER_COMBINER_LOCKS_OFFLOADED, GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_INITIATED, GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS, + GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL, + GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_LONG_ITEMS, GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_TO_SELF, @@ -415,6 +417,11 @@ typedef enum { #define GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS(exec_ctx) \ GRPC_STATS_INC_COUNTER( \ (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS) +#define GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_SET_NOTIFY_ON_CANCEL) +#define GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), GRPC_STATS_COUNTER_CALL_COMBINER_CANCELLED) #define GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_EXECUTOR_SCHEDULED_SHORT_ITEMS) diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index d12c8df663..af4553028e 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -251,6 +251,10 @@ (first items queued to a call combiner) - counter: call_combiner_locks_scheduled_items doc: Number of items scheduled against call combiner locks +- counter: call_combiner_set_notify_on_cancel + doc: Number of times a cancellation callback was set on a call combiner +- counter: call_combiner_cancelled + doc: Number of times a call combiner was cancelled # executor - counter: executor_scheduled_short_items doc: Number of finite runtime closures scheduled against the executor diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 5e541b9181..04b6d471f6 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -81,6 +81,8 @@ combiner_locks_scheduled_final_items_per_iteration:FLOAT, combiner_locks_offloaded_per_iteration:FLOAT, call_combiner_locks_initiated_per_iteration:FLOAT, call_combiner_locks_scheduled_items_per_iteration:FLOAT, +call_combiner_set_notify_on_cancel_per_iteration:FLOAT, +call_combiner_cancelled_per_iteration:FLOAT, executor_scheduled_short_items_per_iteration:FLOAT, executor_scheduled_long_items_per_iteration:FLOAT, executor_scheduled_to_self_per_iteration:FLOAT, diff --git a/src/core/lib/iomgr/call_combiner.cc b/src/core/lib/iomgr/call_combiner.cc index 8b08d307a1..0ba32a1e46 100644 --- a/src/core/lib/iomgr/call_combiner.cc +++ b/src/core/lib/iomgr/call_combiner.cc @@ -138,6 +138,7 @@ void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, grpc_call_combiner* call_combiner, grpc_closure* closure) { + GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL(exec_ctx); while (true) { // Decode original state. gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); @@ -182,6 +183,7 @@ void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, grpc_call_combiner* call_combiner, grpc_error* error) { + GRPC_STATS_INC_CALL_COMBINER_CANCELLED(exec_ctx); while (true) { gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); grpc_error* original_error = decode_cancel_state_error(original_state); diff --git a/tools/run_tests/performance/massage_qps_stats.py b/tools/run_tests/performance/massage_qps_stats.py index eadd17db8e..48c57581a5 100644 --- a/tools/run_tests/performance/massage_qps_stats.py +++ b/tools/run_tests/performance/massage_qps_stats.py @@ -103,6 +103,8 @@ def massage_qps_stats(scenario_result): stats["core_combiner_locks_offloaded"] = massage_qps_stats_helpers.counter(core_stats, "combiner_locks_offloaded") stats["core_call_combiner_locks_initiated"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_locks_initiated") stats["core_call_combiner_locks_scheduled_items"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_locks_scheduled_items") + stats["core_call_combiner_set_notify_on_cancel"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_set_notify_on_cancel") + stats["core_call_combiner_cancelled"] = massage_qps_stats_helpers.counter(core_stats, "call_combiner_cancelled") stats["core_executor_scheduled_short_items"] = massage_qps_stats_helpers.counter(core_stats, "executor_scheduled_short_items") stats["core_executor_scheduled_long_items"] = massage_qps_stats_helpers.counter(core_stats, "executor_scheduled_long_items") stats["core_executor_scheduled_to_self"] = massage_qps_stats_helpers.counter(core_stats, "executor_scheduled_to_self") diff --git a/tools/run_tests/performance/scenario_result_schema.json b/tools/run_tests/performance/scenario_result_schema.json index 1d726539b9..b00c2eed16 100644 --- a/tools/run_tests/performance/scenario_result_schema.json +++ b/tools/run_tests/performance/scenario_result_schema.json @@ -525,6 +525,16 @@ "name": "core_call_combiner_locks_scheduled_items", "type": "INTEGER" }, + { + "mode": "NULLABLE", + "name": "core_call_combiner_set_notify_on_cancel", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_call_combiner_cancelled", + "type": "INTEGER" + }, { "mode": "NULLABLE", "name": "core_executor_scheduled_short_items", @@ -1347,6 +1357,16 @@ "name": "core_call_combiner_locks_scheduled_items", "type": "INTEGER" }, + { + "mode": "NULLABLE", + "name": "core_call_combiner_set_notify_on_cancel", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_call_combiner_cancelled", + "type": "INTEGER" + }, { "mode": "NULLABLE", "name": "core_executor_scheduled_short_items", -- cgit v1.2.3 From fbf8128da8ea1dd55729c957e915afedb8754f0d Mon Sep 17 00:00:00 2001 From: Frank Groeneveld Date: Thu, 12 Oct 2017 08:27:14 +0200 Subject: Add OpenBSD support --- Makefile | 2 +- build.yaml | 4 +- grpc.gemspec | 1 + include/grpc/impl/codegen/port_platform.h | 23 + setup.py | 2 + src/c-ares/gen_build_yaml.py | 5 +- src/core/lib/iomgr/port.h | 10 + src/core/lib/support/time_posix.cc | 2 +- src/ruby/ext/grpc/extconf.rb | 4 +- third_party/cares/config_openbsd/ares_config.h | 502 +++++++++++++++++++++ tools/run_tests/generated/sources_and_headers.json | 3 +- 11 files changed, 551 insertions(+), 7 deletions(-) create mode 100644 third_party/cares/config_openbsd/ares_config.h (limited to 'src/core') diff --git a/Makefile b/Makefile index 0357f7baac..7cf412d21d 100644 --- a/Makefile +++ b/Makefile @@ -8423,7 +8423,7 @@ PUBLIC_HEADERS_C += \ LIBARES_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(LIBARES_SRC)))) -$(LIBARES_OBJS): CPPFLAGS += -Ithird_party/cares -Ithird_party/cares/cares $(if $(subst Linux,,$(SYSTEM)),,-Ithird_party/cares/config_linux) $(if $(subst Darwin,,$(SYSTEM)),,-Ithird_party/cares/config_darwin) -fvisibility=hidden -D_GNU_SOURCE -DWIN32_LEAN_AND_MEAN -D_HAS_EXCEPTIONS=0 -DNOMINMAX $(if $(subst MINGW32,,$(SYSTEM)),-DHAVE_CONFIG_H,) +$(LIBARES_OBJS): CPPFLAGS += -Ithird_party/cares -Ithird_party/cares/cares $(if $(subst Linux,,$(SYSTEM)),,-Ithird_party/cares/config_linux) $(if $(subst Darwin,,$(SYSTEM)),,-Ithird_party/cares/config_darwin) -fvisibility=hidden $(if $(subst OpenBSD,,$(SYSTEM)),,-Ithird_party/cares/config_openbsd) -D_GNU_SOURCE -DWIN32_LEAN_AND_MEAN -D_HAS_EXCEPTIONS=0 -DNOMINMAX $(if $(subst MINGW32,,$(SYSTEM)),-DHAVE_CONFIG_H,) $(LIBARES_OBJS): CFLAGS += -Wno-sign-conversion $(if $(subst Darwin,,$(SYSTEM)),,-Wno-shorten-64-to-32) $(if $(subst MINGW32,,$(SYSTEM)),-Wno-invalid-source-encoding,) $(LIBDIR)/$(CONFIG)/libares.a: $(ZLIB_DEP) $(LIBARES_OBJS) diff --git a/build.yaml b/build.yaml index fac62d21d1..2237fe10cd 100644 --- a/build.yaml +++ b/build.yaml @@ -4872,8 +4872,8 @@ defaults: $(if $(subst MINGW32,,$(SYSTEM)),-Wno-invalid-source-encoding,) CPPFLAGS: -Ithird_party/cares -Ithird_party/cares/cares $(if $(subst Linux,,$(SYSTEM)),,-Ithird_party/cares/config_linux) $(if $(subst Darwin,,$(SYSTEM)),,-Ithird_party/cares/config_darwin) -fvisibility=hidden - -D_GNU_SOURCE -DWIN32_LEAN_AND_MEAN -D_HAS_EXCEPTIONS=0 -DNOMINMAX $(if $(subst - MINGW32,,$(SYSTEM)),-DHAVE_CONFIG_H,) + $(if $(subst OpenBSD,,$(SYSTEM)),,-Ithird_party/cares/config_openbsd) -D_GNU_SOURCE + -DWIN32_LEAN_AND_MEAN -D_HAS_EXCEPTIONS=0 -DNOMINMAX $(if $(subst MINGW32,,$(SYSTEM)),-DHAVE_CONFIG_H,) benchmark: CPPFLAGS: -Ithird_party/benchmark/include -DHAVE_POSIX_REGEX boringssl: diff --git a/grpc.gemspec b/grpc.gemspec index ce23e6f7df..6b35d7a7c1 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1125,6 +1125,7 @@ Gem::Specification.new do |s| s.files += %w( third_party/cares/ares_build.h ) s.files += %w( third_party/cares/config_linux/ares_config.h ) s.files += %w( third_party/cares/config_darwin/ares_config.h ) + s.files += %w( third_party/cares/config_openbsd/ares_config.h ) s.files += %w( third_party/cares/cares/ares__close_sockets.c ) s.files += %w( third_party/cares/cares/ares__get_hostent.c ) s.files += %w( third_party/cares/cares/ares__read_line.c ) diff --git a/include/grpc/impl/codegen/port_platform.h b/include/grpc/impl/codegen/port_platform.h index baea4bcf52..fb4bfc3162 100644 --- a/include/grpc/impl/codegen/port_platform.h +++ b/include/grpc/impl/codegen/port_platform.h @@ -241,6 +241,29 @@ #else /* _LP64 */ #define GPR_ARCH_32 1 #endif /* _LP64 */ +#elif defined(__OpenBSD__) +#define GPR_PLATFORM_STRING "openbsd" +#ifndef _BSD_SOURCE +#define _BSD_SOURCE +#endif +#define GPR_OPENBSD 1 +#define GPR_CPU_POSIX 1 +#define GPR_GCC_ATOMIC 1 +#define GPR_GCC_TLS 1 +#define GPR_POSIX_LOG 1 +#define GPR_POSIX_ENV 1 +#define GPR_POSIX_TMPFILE 1 +#define GPR_POSIX_STRING 1 +#define GPR_POSIX_SUBPROCESS 1 +#define GPR_POSIX_SYNC 1 +#define GPR_POSIX_TIME 1 +#define GPR_GETPID_IN_UNISTD_H 1 +#define GPR_SUPPORT_CHANNELS_FROM_FD 1 +#ifdef _LP64 +#define GPR_ARCH_64 1 +#else /* _LP64 */ +#define GPR_ARCH_32 1 +#endif /* _LP64 */ #elif defined(__native_client__) #define GPR_PLATFORM_STRING "nacl" #ifndef _BSD_SOURCE diff --git a/setup.py b/setup.py index 90c9316b0d..5a85ed8545 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,8 @@ if 'linux' in sys.platform: CARES_INCLUDE += (os.path.join('third_party', 'cares', 'config_linux'),) if 'darwin' in sys.platform: CARES_INCLUDE += (os.path.join('third_party', 'cares', 'config_darwin'),) +if 'openbsd' in sys.platform: + CARES_INCLUDE += (os.path.join('third_party', 'cares', 'config_openbsd'),) README = os.path.join(PYTHON_STEM, 'README.rst') # Ensure we're in the proper directory whether or not we're being used by pip. diff --git a/src/c-ares/gen_build_yaml.py b/src/c-ares/gen_build_yaml.py index 6f8b7485ac..b7707dc36d 100755 --- a/src/c-ares/gen_build_yaml.py +++ b/src/c-ares/gen_build_yaml.py @@ -33,6 +33,8 @@ try: return 'src/cares/cares/config_linux/ares_config.h' if 'darwin' in sys.platform: return 'src/cares/cares/config_darwin/ares_config.h' + if 'openbsd' in sys.platform: + return 'src/cares/cares/config_openbsd/ares_config.h' if not os.path.isfile('third_party/cares/cares/ares_config.h'): gen_ares_build(x) return 'third_party/cares/cares/ares_config.h' @@ -125,7 +127,8 @@ try: "third_party/cares/cares/setup_once.h", "third_party/cares/ares_build.h", "third_party/cares/config_linux/ares_config.h", - "third_party/cares/config_darwin/ares_config.h" + "third_party/cares/config_darwin/ares_config.h", + "third_party/cares/config_openbsd/ares_config.h" ], }] except: diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 42033d0ba4..1cc6d98491 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -109,6 +109,16 @@ #define GRPC_POSIX_SOCKETUTILS 1 #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 +#elif defined(GPR_OPENBSD) +#define GRPC_HAVE_IFADDRS 1 +#define GRPC_HAVE_IPV6_RECVPKTINFO 1 +#define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#define GRPC_POSIX_SOCKET 1 +#define GRPC_POSIX_SOCKETADDR 1 +#define GRPC_POSIX_SOCKETUTILS 1 +#define GRPC_POSIX_WAKEUP_FD 1 +#define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_NACL) #define GRPC_HAVE_ARPA_NAMESER 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 diff --git a/src/core/lib/support/time_posix.cc b/src/core/lib/support/time_posix.cc index 3267ea6b54..3f8a9094fd 100644 --- a/src/core/lib/support/time_posix.cc +++ b/src/core/lib/support/time_posix.cc @@ -42,7 +42,7 @@ static struct timespec timespec_from_gpr(gpr_timespec gts) { return rv; } -#if _POSIX_TIMERS > 0 +#if _POSIX_TIMERS > 0 || defined(__OpenBSD__) static gpr_timespec gpr_from_timespec(struct timespec ts, gpr_clock_type clock_type) { /* diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb index e5ab02b9fc..9d2cf2a08a 100644 --- a/src/ruby/ext/grpc/extconf.rb +++ b/src/ruby/ext/grpc/extconf.rb @@ -41,6 +41,7 @@ LIB_DIRS = [ ] windows = RUBY_PLATFORM =~ /mingw|mswin/ +bsd = RUBY_PLATFORM =~ /bsd/ grpc_root = File.expand_path(File.join(File.dirname(__FILE__), '../../../..')) @@ -70,7 +71,8 @@ unless windows puts 'Building internal gRPC into ' + grpc_lib_dir nproc = 4 nproc = Etc.nprocessors * 2 if Etc.respond_to? :nprocessors - system("make -j#{nproc} -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config} Q=") + make = bsd ? 'gmake' : 'make' + system("#{make} -j#{nproc} -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config} Q=") exit 1 unless $? == 0 end diff --git a/third_party/cares/config_openbsd/ares_config.h b/third_party/cares/config_openbsd/ares_config.h new file mode 100644 index 0000000000..3b3320db8f --- /dev/null +++ b/third_party/cares/config_openbsd/ares_config.h @@ -0,0 +1,502 @@ +/* ares_config.h. Generated from ares_config.h.in by configure. */ +/* ares_config.h.in. Generated from configure.ac by autoheader. */ + +/* Define if building universal (internal helper macro) */ +/* #undef AC_APPLE_UNIVERSAL_BUILD */ + +/* define this if ares is built for a big endian system */ +/* #undef ARES_BIG_ENDIAN */ + +/* when building as static part of libcurl */ +/* #undef BUILDING_LIBCURL */ + +/* Defined for build that exposes internal static functions for testing. */ +/* #undef CARES_EXPOSE_STATICS */ + +/* Defined for build with symbol hiding. */ +#define CARES_SYMBOL_HIDING 1 + +/* Definition to make a library symbol externally visible. */ +#define CARES_SYMBOL_SCOPE_EXTERN __attribute__ ((__visibility__ ("default"))) + +/* the signed version of size_t */ +#define CARES_TYPEOF_ARES_SSIZE_T ssize_t + +/* Use resolver library to configure cares */ +/* #undef CARES_USE_LIBRESOLV */ + +/* if a /etc/inet dir is being used */ +/* #undef ETC_INET */ + +/* Define to the type of arg 2 for gethostname. */ +#define GETHOSTNAME_TYPE_ARG2 size_t + +/* Define to the type qualifier of arg 1 for getnameinfo. */ +#define GETNAMEINFO_QUAL_ARG1 const + +/* Define to the type of arg 1 for getnameinfo. */ +#define GETNAMEINFO_TYPE_ARG1 struct sockaddr * + +/* Define to the type of arg 2 for getnameinfo. */ +#define GETNAMEINFO_TYPE_ARG2 socklen_t + +/* Define to the type of args 4 and 6 for getnameinfo. */ +#define GETNAMEINFO_TYPE_ARG46 size_t + +/* Define to the type of arg 7 for getnameinfo. */ +#define GETNAMEINFO_TYPE_ARG7 int + +/* Specifies the number of arguments to getservbyport_r */ +#define GETSERVBYPORT_R_ARGS 4 + +/* Specifies the size of the buffer to pass to getservbyport_r */ +#define GETSERVBYPORT_R_BUFSIZE sizeof(struct servent_data) + +/* Define to 1 if you have AF_INET6. */ +#define HAVE_AF_INET6 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_ARPA_INET_H 1 + +/* Define to 1 if you have the header file. */ +/* #undef HAVE_ARPA_NAMESER_COMPAT_H */ + +/* Define to 1 if you have the header file. */ +#define HAVE_ARPA_NAMESER_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_ASSERT_H 1 + +/* Define to 1 if you have the `bitncmp' function. */ +/* #undef HAVE_BITNCMP */ + +/* Define to 1 if bool is an available type. */ +#define HAVE_BOOL_T 1 + +/* Define to 1 if you have the clock_gettime function and monotonic timer. */ +#define HAVE_CLOCK_GETTIME_MONOTONIC 1 + +/* Define to 1 if you have the closesocket function. */ +/* #undef HAVE_CLOSESOCKET */ + +/* Define to 1 if you have the CloseSocket camel case function. */ +/* #undef HAVE_CLOSESOCKET_CAMEL */ + +/* Define to 1 if you have the connect function. */ +#define HAVE_CONNECT 1 + +/* define if the compiler supports basic C++11 syntax */ +/* #undef HAVE_CXX11 */ + +/* Define to 1 if you have the header file. */ +#define HAVE_DLFCN_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_ERRNO_H 1 + +/* Define to 1 if you have the fcntl function. */ +#define HAVE_FCNTL 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_FCNTL_H 1 + +/* Define to 1 if you have a working fcntl O_NONBLOCK function. */ +#define HAVE_FCNTL_O_NONBLOCK 1 + +/* Define to 1 if you have the freeaddrinfo function. */ +#define HAVE_FREEADDRINFO 1 + +/* Define to 1 if you have a working getaddrinfo function. */ +#define HAVE_GETADDRINFO 1 + +/* Define to 1 if the getaddrinfo function is threadsafe. */ +/* #undef HAVE_GETADDRINFO_THREADSAFE */ + +/* Define to 1 if you have the getenv function. */ +#define HAVE_GETENV 1 + +/* Define to 1 if you have the gethostbyaddr function. */ +#define HAVE_GETHOSTBYADDR 1 + +/* Define to 1 if you have the gethostbyname function. */ +#define HAVE_GETHOSTBYNAME 1 + +/* Define to 1 if you have the gethostname function. */ +#define HAVE_GETHOSTNAME 1 + +/* Define to 1 if you have the getnameinfo function. */ +#define HAVE_GETNAMEINFO 1 + +/* Define to 1 if you have the getservbyport_r function. */ +#define HAVE_GETSERVBYPORT_R 1 + +/* Define to 1 if you have the `gettimeofday' function. */ +#define HAVE_GETTIMEOFDAY 1 + +/* Define to 1 if you have the `if_indextoname' function. */ +#define HAVE_IF_INDEXTONAME 1 + +/* Define to 1 if you have a IPv6 capable working inet_net_pton function. */ +/* #undef HAVE_INET_NET_PTON */ + +/* Define to 1 if you have a IPv6 capable working inet_ntop function. */ +#define HAVE_INET_NTOP 1 + +/* Define to 1 if you have a IPv6 capable working inet_pton function. */ +#define HAVE_INET_PTON 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_INTTYPES_H 1 + +/* Define to 1 if you have the ioctl function. */ +#define HAVE_IOCTL 1 + +/* Define to 1 if you have the ioctlsocket function. */ +/* #undef HAVE_IOCTLSOCKET */ + +/* Define to 1 if you have the IoctlSocket camel case function. */ +/* #undef HAVE_IOCTLSOCKET_CAMEL */ + +/* Define to 1 if you have a working IoctlSocket camel case FIONBIO function. + */ +/* #undef HAVE_IOCTLSOCKET_CAMEL_FIONBIO */ + +/* Define to 1 if you have a working ioctlsocket FIONBIO function. */ +/* #undef HAVE_IOCTLSOCKET_FIONBIO */ + +/* Define to 1 if you have a working ioctl FIONBIO function. */ +#define HAVE_IOCTL_FIONBIO 1 + +/* Define to 1 if you have a working ioctl SIOCGIFADDR function. */ +#define HAVE_IOCTL_SIOCGIFADDR 1 + +/* Define to 1 if you have the `resolve' library (-lresolve). */ +/* #undef HAVE_LIBRESOLVE */ + +/* Define to 1 if you have the header file. */ +#define HAVE_LIMITS_H 1 + +/* if your compiler supports LL */ +#define HAVE_LL 1 + +/* Define to 1 if the compiler supports the 'long long' data type. */ +#define HAVE_LONGLONG 1 + +/* Define to 1 if you have the malloc.h header file. */ +/* #undef HAVE_MALLOC_H */ + +/* Define to 1 if you have the memory.h header file. */ +#define HAVE_MEMORY_H 1 + +/* Define to 1 if you have the MSG_NOSIGNAL flag. */ +#define HAVE_MSG_NOSIGNAL 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_NETDB_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_NETINET_IN_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_NETINET_TCP_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_NET_IF_H 1 + +/* Define to 1 if you have PF_INET6. */ +#define HAVE_PF_INET6 1 + +/* Define to 1 if you have the recv function. */ +#define HAVE_RECV 1 + +/* Define to 1 if you have the recvfrom function. */ +#define HAVE_RECVFROM 1 + +/* Define to 1 if you have the send function. */ +#define HAVE_SEND 1 + +/* Define to 1 if you have the setsockopt function. */ +#define HAVE_SETSOCKOPT 1 + +/* Define to 1 if you have a working setsockopt SO_NONBLOCK function. */ +/* #undef HAVE_SETSOCKOPT_SO_NONBLOCK */ + +/* Define to 1 if you have the header file. */ +#define HAVE_SIGNAL_H 1 + +/* Define to 1 if sig_atomic_t is an available typedef. */ +#define HAVE_SIG_ATOMIC_T 1 + +/* Define to 1 if sig_atomic_t is already defined as volatile. */ +/* #undef HAVE_SIG_ATOMIC_T_VOLATILE */ + +/* Define to 1 if your struct sockaddr_in6 has sin6_scope_id. */ +#define HAVE_SOCKADDR_IN6_SIN6_SCOPE_ID 1 + +/* Define to 1 if you have the socket function. */ +#define HAVE_SOCKET 1 + +/* Define to 1 if you have the header file. */ +/* #undef HAVE_SOCKET_H */ + +/* Define to 1 if you have the header file. */ +#define HAVE_STDBOOL_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STDINT_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STDLIB_H 1 + +/* Define to 1 if you have the strcasecmp function. */ +#define HAVE_STRCASECMP 1 + +/* Define to 1 if you have the strcmpi function. */ +/* #undef HAVE_STRCMPI */ + +/* Define to 1 if you have the strdup function. */ +#define HAVE_STRDUP 1 + +/* Define to 1 if you have the stricmp function. */ +/* #undef HAVE_STRICMP */ + +/* Define to 1 if you have the header file. */ +#define HAVE_STRINGS_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_STRING_H 1 + +/* Define to 1 if you have the strncasecmp function. */ +#define HAVE_STRNCASECMP 1 + +/* Define to 1 if you have the strncmpi function. */ +/* #undef HAVE_STRNCMPI */ + +/* Define to 1 if you have the strnicmp function. */ +/* #undef HAVE_STRNICMP */ + +/* Define to 1 if you have the header file. */ +/* #undef HAVE_STROPTS_H */ + +/* Define to 1 if you have struct addrinfo. */ +#define HAVE_STRUCT_ADDRINFO 1 + +/* Define to 1 if you have struct in6_addr. */ +#define HAVE_STRUCT_IN6_ADDR 1 + +/* Define to 1 if you have struct sockaddr_in6. */ +#define HAVE_STRUCT_SOCKADDR_IN6 1 + +/* if struct sockaddr_storage is defined */ +#define HAVE_STRUCT_SOCKADDR_STORAGE 1 + +/* Define to 1 if you have the timeval struct. */ +#define HAVE_STRUCT_TIMEVAL 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_IOCTL_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_PARAM_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_SELECT_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_SOCKET_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_STAT_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_TIME_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_TYPES_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_SYS_UIO_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_TIME_H 1 + +/* Define to 1 if you have the header file. */ +#define HAVE_UNISTD_H 1 + +/* Define to 1 if you have the windows.h header file. */ +/* #undef HAVE_WINDOWS_H */ + +/* Define to 1 if you have the winsock2.h header file. */ +/* #undef HAVE_WINSOCK2_H */ + +/* Define to 1 if you have the winsock.h header file. */ +/* #undef HAVE_WINSOCK_H */ + +/* Define to 1 if you have the writev function. */ +#define HAVE_WRITEV 1 + +/* Define to 1 if you have the ws2tcpip.h header file. */ +/* #undef HAVE_WS2TCPIP_H */ + +/* Define to the sub-directory where libtool stores uninstalled libraries. */ +#define LT_OBJDIR ".libs/" + +/* Define to 1 if you need the malloc.h header file even with stdlib.h */ +/* #undef NEED_MALLOC_H */ + +/* Define to 1 if you need the memory.h header file even with stdlib.h */ +/* #undef NEED_MEMORY_H */ + +/* Define to 1 if _REENTRANT preprocessor symbol must be defined. */ +/* #undef NEED_REENTRANT */ + +/* Define to 1 if _THREAD_SAFE preprocessor symbol must be defined. */ +/* #undef NEED_THREAD_SAFE */ + +/* cpu-machine-OS */ +#define OS "x86_64-unknown-openbsd6.2" + +/* Name of package */ +#define PACKAGE "c-ares" + +/* Define to the address where bug reports for this package should be sent. */ +#define PACKAGE_BUGREPORT "c-ares mailing list: http://cool.haxx.se/mailman/listinfo/c-ares" + +/* Define to the full name of this package. */ +#define PACKAGE_NAME "c-ares" + +/* Define to the full name and version of this package. */ +#define PACKAGE_STRING "c-ares 1.13.0" + +/* Define to the one symbol short name of this package. */ +#define PACKAGE_TARNAME "c-ares" + +/* Define to the home page for this package. */ +#define PACKAGE_URL "" + +/* Define to the version of this package. */ +#define PACKAGE_VERSION "1.13.0" + +/* a suitable file/device to read random data from */ +#define RANDOM_FILE "/dev/urandom" + +/* Define to the type qualifier pointed by arg 5 for recvfrom. */ +#define RECVFROM_QUAL_ARG5 + +/* Define to the type of arg 1 for recvfrom. */ +#define RECVFROM_TYPE_ARG1 int + +/* Define to the type pointed by arg 2 for recvfrom. */ +#define RECVFROM_TYPE_ARG2 void + +/* Define to 1 if the type pointed by arg 2 for recvfrom is void. */ +#define RECVFROM_TYPE_ARG2_IS_VOID 1 + +/* Define to the type of arg 3 for recvfrom. */ +#define RECVFROM_TYPE_ARG3 size_t + +/* Define to the type of arg 4 for recvfrom. */ +#define RECVFROM_TYPE_ARG4 int + +/* Define to the type pointed by arg 5 for recvfrom. */ +#define RECVFROM_TYPE_ARG5 struct sockaddr + +/* Define to 1 if the type pointed by arg 5 for recvfrom is void. */ +/* #undef RECVFROM_TYPE_ARG5_IS_VOID */ + +/* Define to the type pointed by arg 6 for recvfrom. */ +#define RECVFROM_TYPE_ARG6 socklen_t + +/* Define to 1 if the type pointed by arg 6 for recvfrom is void. */ +/* #undef RECVFROM_TYPE_ARG6_IS_VOID */ + +/* Define to the function return type for recvfrom. */ +#define RECVFROM_TYPE_RETV ssize_t + +/* Define to the type of arg 1 for recv. */ +#define RECV_TYPE_ARG1 int + +/* Define to the type of arg 2 for recv. */ +#define RECV_TYPE_ARG2 void * + +/* Define to the type of arg 3 for recv. */ +#define RECV_TYPE_ARG3 size_t + +/* Define to the type of arg 4 for recv. */ +#define RECV_TYPE_ARG4 int + +/* Define to the function return type for recv. */ +#define RECV_TYPE_RETV ssize_t + +/* Define as the return type of signal handlers (`int' or `void'). */ +#define RETSIGTYPE void + +/* Define to the type qualifier of arg 2 for send. */ +#define SEND_QUAL_ARG2 const + +/* Define to the type of arg 1 for send. */ +#define SEND_TYPE_ARG1 int + +/* Define to the type of arg 2 for send. */ +#define SEND_TYPE_ARG2 void * + +/* Define to the type of arg 3 for send. */ +#define SEND_TYPE_ARG3 size_t + +/* Define to the type of arg 4 for send. */ +#define SEND_TYPE_ARG4 int + +/* Define to the function return type for send. */ +#define SEND_TYPE_RETV ssize_t + +/* Define to 1 if you have the ANSI C header files. */ +#define STDC_HEADERS 1 + +/* Define to 1 if you can safely include both and . */ +#define TIME_WITH_SYS_TIME 1 + +/* Define to disable non-blocking sockets. */ +/* #undef USE_BLOCKING_SOCKETS */ + +/* Version number of package */ +#define VERSION "1.13.0" + +/* Define to avoid automatic inclusion of winsock.h */ +/* #undef WIN32_LEAN_AND_MEAN */ + +/* Define WORDS_BIGENDIAN to 1 if your processor stores words with the most + significant byte first (like Motorola and SPARC, unlike Intel). */ +#if defined AC_APPLE_UNIVERSAL_BUILD +# if defined __BIG_ENDIAN__ +# define WORDS_BIGENDIAN 1 +# endif +#else +# ifndef WORDS_BIGENDIAN +/* # undef WORDS_BIGENDIAN */ +# endif +#endif + +/* Define to 1 if OS is AIX. */ +#ifndef _ALL_SOURCE +/* # undef _ALL_SOURCE */ +#endif + +/* Enable large inode numbers on Mac OS X 10.5. */ +#ifndef _DARWIN_USE_64_BIT_INODE +# define _DARWIN_USE_64_BIT_INODE 1 +#endif + +/* Number of bits in a file offset, on hosts where this is settable. */ +/* #undef _FILE_OFFSET_BITS */ + +/* Define for large files, on AIX-style hosts. */ +/* #undef _LARGE_FILES */ + +/* Define to empty if `const' does not conform to ANSI C. */ +/* #undef const */ + +/* Type to use in place of in_addr_t when system does not provide it. */ +/* #undef in_addr_t */ + +/* Define to `unsigned int' if does not define. */ +/* #undef size_t */ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 21b3bef691..f3405b8346 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7483,7 +7483,8 @@ "third_party/cares/cares/config-win32.h", "third_party/cares/cares/setup_once.h", "third_party/cares/config_darwin/ares_config.h", - "third_party/cares/config_linux/ares_config.h" + "third_party/cares/config_linux/ares_config.h", + "third_party/cares/config_openbsd/ares_config.h" ], "is_filegroup": false, "language": "c", -- cgit v1.2.3