aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs')
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs77
1 files changed, 22 insertions, 55 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
index 661b31b0ee..8b0cf3a2d0 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
@@ -36,6 +36,7 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Google.ProtocolBuffers;
+using Grpc.Core;
using Grpc.Core.Utils;
namespace grpc.testing
@@ -45,88 +46,54 @@ namespace grpc.testing
/// </summary>
public class TestServiceImpl : TestServiceGrpc.ITestService
{
- public void EmptyCall(Empty request, IObserver<Empty> responseObserver)
+ public Task<Empty> EmptyCall(Empty request)
{
- responseObserver.OnNext(Empty.DefaultInstance);
- responseObserver.OnCompleted();
+ return Task.FromResult(Empty.DefaultInstance);
}
- public void UnaryCall(SimpleRequest request, IObserver<SimpleResponse> responseObserver)
+ public Task<SimpleResponse> UnaryCall(SimpleRequest request)
{
var response = SimpleResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(request.ResponseSize)).Build();
- // TODO: check we support ReponseType
- responseObserver.OnNext(response);
- responseObserver.OnCompleted();
+ return Task.FromResult(response);
}
- public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver)
+ public async Task StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream)
{
foreach (var responseParam in request.ResponseParametersList)
{
var response = StreamingOutputCallResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(responseParam.Size)).Build();
- responseObserver.OnNext(response);
+ await responseStream.Write(response);
}
- responseObserver.OnCompleted();
}
- public IObserver<StreamingInputCallRequest> StreamingInputCall(IObserver<StreamingInputCallResponse> responseObserver)
+ public async Task<StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<StreamingInputCallRequest> requestStream)
{
- var recorder = new RecordingObserver<StreamingInputCallRequest>();
- Task.Run(() =>
+ int sum = 0;
+ await requestStream.ForEach(async request =>
{
- int sum = 0;
- foreach (var req in recorder.ToList().Result)
- {
- sum += req.Payload.Body.Length;
- }
- var response = StreamingInputCallResponse.CreateBuilder()
- .SetAggregatedPayloadSize(sum).Build();
- responseObserver.OnNext(response);
- responseObserver.OnCompleted();
+ sum += request.Payload.Body.Length;
});
- return recorder;
+ return StreamingInputCallResponse.CreateBuilder().SetAggregatedPayloadSize(sum).Build();
}
- public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver)
+ public async Task FullDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream)
{
- return new FullDuplexObserver(responseObserver);
- }
-
- public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver)
- {
- throw new NotImplementedException();
- }
-
- private class FullDuplexObserver : IObserver<StreamingOutputCallRequest>
- {
- readonly IObserver<StreamingOutputCallResponse> responseObserver;
-
- public FullDuplexObserver(IObserver<StreamingOutputCallResponse> responseObserver)
- {
- this.responseObserver = responseObserver;
- }
-
- public void OnCompleted()
+ await requestStream.ForEach(async request =>
{
- responseObserver.OnCompleted();
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- public void OnNext(StreamingOutputCallRequest value)
- {
- foreach (var responseParam in value.ResponseParametersList)
+ foreach (var responseParam in request.ResponseParametersList)
{
var response = StreamingOutputCallResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(responseParam.Size)).Build();
- responseObserver.OnNext(response);
+ await responseStream.Write(response);
}
- }
+ });
+ }
+
+ public async Task HalfDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream)
+ {
+ throw new NotImplementedException();
}
private static Payload CreateZerosPayload(int size)