diff options
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs')
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs | 77 |
1 files changed, 22 insertions, 55 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs index 661b31b0ee..8b0cf3a2d0 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs @@ -36,6 +36,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Google.ProtocolBuffers; +using Grpc.Core; using Grpc.Core.Utils; namespace grpc.testing @@ -45,88 +46,54 @@ namespace grpc.testing /// </summary> public class TestServiceImpl : TestServiceGrpc.ITestService { - public void EmptyCall(Empty request, IObserver<Empty> responseObserver) + public Task<Empty> EmptyCall(Empty request) { - responseObserver.OnNext(Empty.DefaultInstance); - responseObserver.OnCompleted(); + return Task.FromResult(Empty.DefaultInstance); } - public void UnaryCall(SimpleRequest request, IObserver<SimpleResponse> responseObserver) + public Task<SimpleResponse> UnaryCall(SimpleRequest request) { var response = SimpleResponse.CreateBuilder() .SetPayload(CreateZerosPayload(request.ResponseSize)).Build(); - // TODO: check we support ReponseType - responseObserver.OnNext(response); - responseObserver.OnCompleted(); + return Task.FromResult(response); } - public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver) + public async Task StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream) { foreach (var responseParam in request.ResponseParametersList) { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - responseObserver.OnNext(response); + await responseStream.Write(response); } - responseObserver.OnCompleted(); } - public IObserver<StreamingInputCallRequest> StreamingInputCall(IObserver<StreamingInputCallResponse> responseObserver) + public async Task<StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<StreamingInputCallRequest> requestStream) { - var recorder = new RecordingObserver<StreamingInputCallRequest>(); - Task.Run(() => + int sum = 0; + await requestStream.ForEach(async request => { - int sum = 0; - foreach (var req in recorder.ToList().Result) - { - sum += req.Payload.Body.Length; - } - var response = StreamingInputCallResponse.CreateBuilder() - .SetAggregatedPayloadSize(sum).Build(); - responseObserver.OnNext(response); - responseObserver.OnCompleted(); + sum += request.Payload.Body.Length; }); - return recorder; + return StreamingInputCallResponse.CreateBuilder().SetAggregatedPayloadSize(sum).Build(); } - public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver) + public async Task FullDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream) { - return new FullDuplexObserver(responseObserver); - } - - public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver) - { - throw new NotImplementedException(); - } - - private class FullDuplexObserver : IObserver<StreamingOutputCallRequest> - { - readonly IObserver<StreamingOutputCallResponse> responseObserver; - - public FullDuplexObserver(IObserver<StreamingOutputCallResponse> responseObserver) - { - this.responseObserver = responseObserver; - } - - public void OnCompleted() + await requestStream.ForEach(async request => { - responseObserver.OnCompleted(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnNext(StreamingOutputCallRequest value) - { - foreach (var responseParam in value.ResponseParametersList) + foreach (var responseParam in request.ResponseParametersList) { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - responseObserver.OnNext(response); + await responseStream.Write(response); } - } + }); + } + + public async Task HalfDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream) + { + throw new NotImplementedException(); } private static Payload CreateZerosPayload(int size) |