diff options
-rw-r--r-- | src/core/lib/debug/stats_data.cc | 9 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data.h | 12 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data.yaml | 10 | ||||
-rw-r--r-- | src/core/lib/debug/stats_data_bq_schema.sql | 5 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 15 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/AsyncClientStreamingCall.cs | 16 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs | 16 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/AsyncServerStreamingCall.cs | 14 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/AsyncUnaryCall.cs | 15 | ||||
-rw-r--r-- | tools/run_tests/performance/massage_qps_stats.py | 3 | ||||
-rw-r--r-- | tools/run_tests/performance/scenario_result_schema.json | 30 |
11 files changed, 139 insertions, 6 deletions
diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc index 5bd7884e28..b4ae8f312c 100644 --- a/src/core/lib/debug/stats_data.cc +++ b/src/core/lib/debug/stats_data.cc @@ -112,6 +112,9 @@ const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "executor_push_retries", "server_requested_calls", "server_slowpath_requests_queued", + "cq_ev_queue_trylock_failures", + "cq_ev_queue_trylock_successes", + "cq_ev_queue_transient_pop_failures", }; const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "Number of client side calls created by this process", @@ -222,6 +225,12 @@ const char *grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "How many calls were requested (not necessarily received) by the server", "How many times was the server slow path taken (indicates too few " "outstanding requests)", + "Number of lock (trylock) acquisition failures on completion queue event " + "queue. High value here indicates high contention on completion queues", + "Number of lock (trylock) acquisition successes on completion queue event " + "queue.", + "Number of times NULL was popped out of completion queue's event queue " + "even though the event queue was not empty", }; const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = { "call_initial_size", diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index d8e4e7d264..d4c4437ca0 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -118,6 +118,9 @@ typedef enum { GRPC_STATS_COUNTER_EXECUTOR_PUSH_RETRIES, GRPC_STATS_COUNTER_SERVER_REQUESTED_CALLS, GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES, + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES, GRPC_STATS_COUNTER_COUNT } grpc_stats_counters; extern const char *grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT]; @@ -425,6 +428,15 @@ typedef enum { #define GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED(exec_ctx) \ GRPC_STATS_INC_COUNTER((exec_ctx), \ GRPC_STATS_COUNTER_SERVER_SLOWPATH_REQUESTS_QUEUED) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_FAILURES) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(exec_ctx) \ + GRPC_STATS_INC_COUNTER((exec_ctx), \ + GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRYLOCK_SUCCESSES) +#define GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(exec_ctx) \ + GRPC_STATS_INC_COUNTER( \ + (exec_ctx), GRPC_STATS_COUNTER_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES) #define GRPC_STATS_INC_CALL_INITIAL_SIZE(exec_ctx, value) \ grpc_stats_inc_call_initial_size((exec_ctx), (int)(value)) void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int x); diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index 5c0ab2262e..00e81e74e2 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -272,4 +272,14 @@ - counter: server_slowpath_requests_queued doc: How many times was the server slow path taken (indicates too few outstanding requests) +# cq +- counter: cq_ev_queue_trylock_failures + doc: Number of lock (trylock) acquisition failures on completion queue event + queue. High value here indicates high contention on completion queues +- counter: cq_ev_queue_trylock_successes + doc: Number of lock (trylock) acquisition successes on completion queue event + queue. +- counter: cq_ev_queue_transient_pop_failures + doc: Number of times NULL was popped out of completion queue's event queue + even though the event queue was not empty diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 54869977b0..f34b0c25a0 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -86,4 +86,7 @@ executor_wakeup_initiated_per_iteration:FLOAT, executor_queue_drained_per_iteration:FLOAT, executor_push_retries_per_iteration:FLOAT, server_requested_calls_per_iteration:FLOAT, -server_slowpath_requests_queued_per_iteration:FLOAT +server_slowpath_requests_queued_per_iteration:FLOAT, +cq_ev_queue_trylock_failures_per_iteration:FLOAT, +cq_ev_queue_trylock_successes_per_iteration:FLOAT, +cq_ev_queue_transient_pop_failures_per_iteration:FLOAT diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 36b4b835f8..21664f03c8 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -362,11 +362,24 @@ static bool cq_event_queue_push(grpc_cq_event_queue *q, grpc_cq_completion *c) { static grpc_cq_completion *cq_event_queue_pop(grpc_cq_event_queue *q) { grpc_cq_completion *c = NULL; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + if (gpr_spinlock_trylock(&q->queue_lock)) { - c = (grpc_cq_completion *)gpr_mpscq_pop(&q->queue); + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES(&exec_ctx); + + bool is_empty = false; + c = (grpc_cq_completion *)gpr_mpscq_pop_and_check_end(&q->queue, &is_empty); gpr_spinlock_unlock(&q->queue_lock); + + if (c == NULL && !is_empty) { + GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES(&exec_ctx); + } + } else { + GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES(&exec_ctx); } + grpc_exec_ctx_finish(&exec_ctx); + if (c) { gpr_atm_no_barrier_fetch_add(&q->num_queue_items, -1); } diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index 087b685963..f59989655e 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -36,7 +36,21 @@ namespace Grpc.Core readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - internal AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + /// <summary> + /// Creates a new AsyncClientStreamingCall object with the specified properties. + /// </summary> + /// <param name="requestStream">Stream of request values.</param> + /// <param name="responseAsync">The response of the asynchronous call.</param> + /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param> + /// <param name="getStatusFunc">Delegate returning the status of the call.</param> + /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param> + /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param> + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, + Task<TResponse> responseAsync, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) { this.requestStream = requestStream; this.responseAsync = responseAsync; diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index ce49fb1596..1cb1a91859 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -35,7 +35,21 @@ namespace Grpc.Core readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - internal AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + /// <summary> + /// Creates a new AsyncDuplexStreamingCall object with the specified properties. + /// </summary> + /// <param name="requestStream">Stream of request values.</param> + /// <param name="responseStream">Stream of response values.</param> + /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param> + /// <param name="getStatusFunc">Delegate returning the status of the call.</param> + /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param> + /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param> + public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, + IAsyncStreamReader<TResponse> responseStream, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index fbc97b8148..4303b0b1b0 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -33,7 +33,19 @@ namespace Grpc.Core readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - internal AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + /// <summary> + /// Creates a new AsyncDuplexStreamingCall object with the specified properties. + /// </summary> + /// <param name="responseStream">Stream of response values.</param> + /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param> + /// <param name="getStatusFunc">Delegate returning the status of the call.</param> + /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param> + /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param> + public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) { this.responseStream = responseStream; this.responseHeadersAsync = responseHeadersAsync; diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs index 6348f3c5fd..17747f86ca 100644 --- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -34,7 +34,20 @@ namespace Grpc.Core readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - internal AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + + /// <summary> + /// Creates a new AsyncUnaryCall object with the specified properties. + /// </summary> + /// <param name="responseAsync">The response of the asynchronous call.</param> + /// <param name="responseHeadersAsync">Response headers of the asynchronous call.</param> + /// <param name="getStatusFunc">Delegate returning the status of the call.</param> + /// <param name="getTrailersFunc">Delegate returning the trailing metadata of the call.</param> + /// <param name="disposeAction">Delegate to invoke when Dispose is called on the call object.</param> + public AsyncUnaryCall(Task<TResponse> responseAsync, + Task<Metadata> responseHeadersAsync, + Func<Status> getStatusFunc, + Func<Metadata> getTrailersFunc, + Action disposeAction) { this.responseAsync = responseAsync; this.responseHeadersAsync = responseHeadersAsync; diff --git a/tools/run_tests/performance/massage_qps_stats.py b/tools/run_tests/performance/massage_qps_stats.py index ebbfe6c26c..e0b6ce6ba6 100644 --- a/tools/run_tests/performance/massage_qps_stats.py +++ b/tools/run_tests/performance/massage_qps_stats.py @@ -109,6 +109,9 @@ def massage_qps_stats(scenario_result): stats["core_executor_push_retries"] = massage_qps_stats_helpers.counter(core_stats, "executor_push_retries") stats["core_server_requested_calls"] = massage_qps_stats_helpers.counter(core_stats, "server_requested_calls") stats["core_server_slowpath_requests_queued"] = massage_qps_stats_helpers.counter(core_stats, "server_slowpath_requests_queued") + stats["core_cq_ev_queue_trylock_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_failures") + stats["core_cq_ev_queue_trylock_successes"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_trylock_successes") + stats["core_cq_ev_queue_transient_pop_failures"] = massage_qps_stats_helpers.counter(core_stats, "cq_ev_queue_transient_pop_failures") h = massage_qps_stats_helpers.histogram(core_stats, "call_initial_size") stats["core_call_initial_size"] = ",".join("%f" % x for x in h.buckets) stats["core_call_initial_size_bkts"] = ",".join("%f" % x for x in h.boundaries) diff --git a/tools/run_tests/performance/scenario_result_schema.json b/tools/run_tests/performance/scenario_result_schema.json index 169221d18c..f11e6359f6 100644 --- a/tools/run_tests/performance/scenario_result_schema.json +++ b/tools/run_tests/performance/scenario_result_schema.json @@ -557,6 +557,21 @@ }, { "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_failures", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_successes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_transient_pop_failures", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", "name": "core_call_initial_size", "type": "STRING" }, @@ -1354,6 +1369,21 @@ }, { "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_failures", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_trylock_successes", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "core_cq_ev_queue_transient_pop_failures", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", "name": "core_call_initial_size", "type": "STRING" }, |