diff options
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 17 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ClientResponseStream.cs | 2 | ||||
-rwxr-xr-x | tools/run_tests/run_performance_tests.py | 14 |
3 files changed, 21 insertions, 12 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 49d0a111ef..5e61e9ec12 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -52,9 +52,8 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; - // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls. - // Indicates that response streaming call has finished. - TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>(); + // Completion of a streaming response call if not null. + TaskCompletionSource<object> streamingResponseCallFinishedTcs; // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers). // Response headers set here once received. @@ -198,6 +197,7 @@ namespace Grpc.Core.Internal byte[] payload = UnsafeSerialize(msg); + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall()); @@ -219,6 +219,7 @@ namespace Grpc.Core.Internal Initialize(details.Channel.CompletionQueue); + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartDuplexStreaming(HandleFinished, metadataArray); @@ -276,13 +277,13 @@ namespace Grpc.Core.Internal } /// <summary> - /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise. + /// Get the task that completes once if streaming response call finishes with ok status and throws RpcException with given status otherwise. /// </summary> - public Task StreamingCallFinishedTask + public Task StreamingResponseCallFinishedTask { get { - return streamingCallFinishedTcs.Task; + return streamingResponseCallFinishedTcs.Task; } } @@ -529,11 +530,11 @@ namespace Grpc.Core.Internal var status = receivedStatus.Status; if (status.StatusCode != StatusCode.OK) { - streamingCallFinishedTcs.SetException(new RpcException(status)); + streamingResponseCallFinishedTcs.SetException(new RpcException(status)); return; } - streamingCallFinishedTcs.SetResult(null); + streamingResponseCallFinishedTcs.SetResult(null); } } } diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index ad9423ff58..65bf60269a 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -73,7 +73,7 @@ namespace Grpc.Core.Internal if (result == null) { - await call.StreamingCallFinishedTask.ConfigureAwait(false); + await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); return false; } return true; diff --git a/tools/run_tests/run_performance_tests.py b/tools/run_tests/run_performance_tests.py index 099ab89ddf..7cb827ecea 100755 --- a/tools/run_tests/run_performance_tests.py +++ b/tools/run_tests/run_performance_tests.py @@ -42,6 +42,7 @@ import os import performance.scenario_config as scenario_config import pipes import re +import report_utils import subprocess import sys import tempfile @@ -453,6 +454,7 @@ if not scenarios: total_scenario_failures = 0 qps_workers_killed = 0 +merged_resultset = {} for scenario in scenarios: if args.dry_run: print(scenario.name) @@ -460,14 +462,20 @@ for scenario in scenarios: try: for worker in scenario.workers: worker.start() - scenario_failures, _ = jobset.run([scenario.jobspec, - create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)], - newline_on_success=True, maxjobs=1) + scenario_failures, resultset = jobset.run([scenario.jobspec, + create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)], + newline_on_success=True, maxjobs=1) total_scenario_failures += scenario_failures + merged_resultset = dict(itertools.chain(merged_resultset.iteritems(), + resultset.iteritems())) finally: # Consider qps workers that need to be killed as failures qps_workers_killed += finish_qps_workers(scenario.workers) + +report_utils.render_junit_xml_report(merged_resultset, 'report.xml', + suite_name='benchmarks') + if total_scenario_failures > 0 or qps_workers_killed > 0: print ("%s scenarios failed and %s qps worker jobs killed" % (total_scenario_failures, qps_workers_killed)) sys.exit(1) |