From a5272b6adc5fb7e8c71b7216b0f5e690980a79b2 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 30 Apr 2015 11:56:46 -0700 Subject: A new version C# API based on async/await --- .../Grpc.IntegrationTesting/InteropClient.cs | 119 +++++++++++---------- .../InteropClientServerTest.cs | 2 +- .../Grpc.IntegrationTesting/TestServiceGrpc.cs | 34 +++--- .../Grpc.IntegrationTesting/TestServiceImpl.cs | 77 ++++--------- 4 files changed, 101 insertions(+), 131 deletions(-) (limited to 'src/csharp/Grpc.IntegrationTesting') diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 1fbae374b1..573ab30452 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -34,6 +34,7 @@ using System; using System.Collections.Generic; using System.Text.RegularExpressions; +using System.Threading.Tasks; using Google.ProtocolBuffers; using grpc.testing; @@ -199,113 +200,115 @@ namespace Grpc.IntegrationTesting public static void RunClientStreaming(TestServiceGrpc.ITestServiceClient client) { - Console.WriteLine("running client_streaming"); + Task.Run(async () => + { + Console.WriteLine("running client_streaming"); - var bodySizes = new List { 27182, 8, 1828, 45904 }; + var bodySizes = new List { 27182, 8, 1828, 45904 }.ConvertAll((size) => StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build()); - var context = client.StreamingInputCall(); - foreach (var size in bodySizes) - { - context.Inputs.OnNext( - StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build()); - } - context.Inputs.OnCompleted(); + var call = client.StreamingInputCall(); + await call.RequestStream.WriteAll(bodySizes); - var response = context.Task.Result; - Assert.AreEqual(74922, response.AggregatedPayloadSize); - Console.WriteLine("Passed!"); + var response = await call.Result; + Assert.AreEqual(74922, response.AggregatedPayloadSize); + Console.WriteLine("Passed!"); + }).Wait(); } public static void RunServerStreaming(TestServiceGrpc.ITestServiceClient client) { - Console.WriteLine("running server_streaming"); + Task.Run(async () => + { + Console.WriteLine("running server_streaming"); - var bodySizes = new List { 31415, 9, 2653, 58979 }; + var bodySizes = new List { 31415, 9, 2653, 58979 }; - var request = StreamingOutputCallRequest.CreateBuilder() + var request = StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddRangeResponseParameters(bodySizes.ConvertAll( - (size) => ResponseParameters.CreateBuilder().SetSize(size).Build())) + (size) => ResponseParameters.CreateBuilder().SetSize(size).Build())) .Build(); - var recorder = new RecordingObserver(); - client.StreamingOutputCall(request, recorder); + var call = client.StreamingOutputCall(request); - var responseList = recorder.ToList().Result; - - foreach (var res in responseList) - { - Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); - } - CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); - Console.WriteLine("Passed!"); + var responseList = await call.ResponseStream.ToList(); + foreach (var res in responseList) + { + Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); + } + CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); + Console.WriteLine("Passed!"); + }).Wait(); } public static void RunPingPong(TestServiceGrpc.ITestServiceClient client) { - Console.WriteLine("running ping_pong"); + Task.Run(async () => + { + Console.WriteLine("running ping_pong"); - var recorder = new RecordingQueue(); - var inputs = client.FullDuplexCall(recorder); + var call = client.FullDuplexCall(); - StreamingOutputCallResponse response; + StreamingOutputCallResponse response; - inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() + await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415)) .SetPayload(CreateZerosPayload(27182)).Build()); - response = recorder.Queue.Take(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(31415, response.Payload.Body.Length); + response = await call.ResponseStream.ReadNext(); + Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(31415, response.Payload.Body.Length); - inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() + await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9)) .SetPayload(CreateZerosPayload(8)).Build()); - response = recorder.Queue.Take(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(9, response.Payload.Body.Length); + response = await call.ResponseStream.ReadNext(); + Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(9, response.Payload.Body.Length); - inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() + await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653)) .SetPayload(CreateZerosPayload(1828)).Build()); - response = recorder.Queue.Take(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(2653, response.Payload.Body.Length); + response = await call.ResponseStream.ReadNext(); + Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(2653, response.Payload.Body.Length); - inputs.OnNext(StreamingOutputCallRequest.CreateBuilder() + await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder() .SetResponseType(PayloadType.COMPRESSABLE) .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979)) .SetPayload(CreateZerosPayload(45904)).Build()); - response = recorder.Queue.Take(); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); - Assert.AreEqual(58979, response.Payload.Body.Length); + response = await call.ResponseStream.ReadNext(); + Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(58979, response.Payload.Body.Length); - inputs.OnCompleted(); + await call.RequestStream.Close(); - recorder.Finished.Wait(); - Assert.AreEqual(0, recorder.Queue.Count); + response = await call.ResponseStream.ReadNext(); + Assert.AreEqual(null, response); - Console.WriteLine("Passed!"); + Console.WriteLine("Passed!"); + }).Wait(); } public static void RunEmptyStream(TestServiceGrpc.ITestServiceClient client) { - Console.WriteLine("running empty_stream"); - - var recorder = new RecordingObserver(); - var inputs = client.FullDuplexCall(recorder); - inputs.OnCompleted(); + Task.Run(async () => + { + Console.WriteLine("running empty_stream"); + var call = client.FullDuplexCall(); + await call.Close(); - var responseList = recorder.ToList().Result; - Assert.AreEqual(0, responseList.Count); + var responseList = await call.ResponseStream.ToList(); + Assert.AreEqual(0, responseList.Count); - Console.WriteLine("Passed!"); + Console.WriteLine("Passed!"); + }).Wait(); } public static void RunServiceAccountCreds(TestServiceGrpc.ITestServiceClient client) diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs index 1e76d3df21..e929b76b5e 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs @@ -87,7 +87,7 @@ namespace Grpc.IntegrationTesting [Test] public void LargeUnary() { - InteropClient.RunEmptyUnary(client); + InteropClient.RunLargeUnary(client); } [Test] diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs index f63e0361a4..d1f8aa12c7 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs @@ -100,13 +100,13 @@ namespace grpc.testing Task UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken)); - void StreamingOutputCall(StreamingOutputCallRequest request, IObserver responseObserver, CancellationToken token = default(CancellationToken)); + AsyncServerStreamingCall StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)); - ClientStreamingAsyncResult StreamingInputCall(CancellationToken token = default(CancellationToken)); + AsyncClientStreamingCall StreamingInputCall(CancellationToken token = default(CancellationToken)); - IObserver FullDuplexCall(IObserver responseObserver, CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall FullDuplexCall(CancellationToken token = default(CancellationToken)); - IObserver HalfDuplexCall(IObserver responseObserver, CancellationToken token = default(CancellationToken)); + AsyncDuplexStreamingCall HalfDuplexCall(CancellationToken token = default(CancellationToken)); } public class TestServiceClientStub : AbstractStub, ITestServiceClient @@ -143,45 +143,45 @@ namespace grpc.testing return Calls.AsyncUnaryCall(call, request, token); } - public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver responseObserver, CancellationToken token = default(CancellationToken)) + public AsyncServerStreamingCall StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken)) { var call = CreateCall(ServiceName, StreamingOutputCallMethod); - Calls.AsyncServerStreamingCall(call, request, responseObserver, token); + return Calls.AsyncServerStreamingCall(call, request, token); } - public ClientStreamingAsyncResult StreamingInputCall(CancellationToken token = default(CancellationToken)) + public AsyncClientStreamingCall StreamingInputCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(ServiceName, StreamingInputCallMethod); return Calls.AsyncClientStreamingCall(call, token); } - public IObserver FullDuplexCall(IObserver responseObserver, CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall FullDuplexCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(ServiceName, FullDuplexCallMethod); - return Calls.DuplexStreamingCall(call, responseObserver, token); + return Calls.AsyncDuplexStreamingCall(call, token); } - public IObserver HalfDuplexCall(IObserver responseObserver, CancellationToken token = default(CancellationToken)) + public AsyncDuplexStreamingCall HalfDuplexCall(CancellationToken token = default(CancellationToken)) { var call = CreateCall(ServiceName, HalfDuplexCallMethod); - return Calls.DuplexStreamingCall(call, responseObserver, token); + return Calls.AsyncDuplexStreamingCall(call, token); } } // server-side interface public interface ITestService { - void EmptyCall(Empty request, IObserver responseObserver); + Task EmptyCall(Empty request); - void UnaryCall(SimpleRequest request, IObserver responseObserver); + Task UnaryCall(SimpleRequest request); - void StreamingOutputCall(StreamingOutputCallRequest request, IObserver responseObserver); + Task StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter responseStream); - IObserver StreamingInputCall(IObserver responseObserver); + Task StreamingInputCall(IAsyncStreamReader requestStream); - IObserver FullDuplexCall(IObserver responseObserver); + Task FullDuplexCall(IAsyncStreamReader requestStream, IServerStreamWriter responseStream); - IObserver HalfDuplexCall(IObserver responseObserver); + Task HalfDuplexCall(IAsyncStreamReader requestStream, IServerStreamWriter responseStream); } public static ServerServiceDefinition BindService(ITestService serviceImpl) diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs index 661b31b0ee..8b0cf3a2d0 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs @@ -36,6 +36,7 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Google.ProtocolBuffers; +using Grpc.Core; using Grpc.Core.Utils; namespace grpc.testing @@ -45,88 +46,54 @@ namespace grpc.testing /// public class TestServiceImpl : TestServiceGrpc.ITestService { - public void EmptyCall(Empty request, IObserver responseObserver) + public Task EmptyCall(Empty request) { - responseObserver.OnNext(Empty.DefaultInstance); - responseObserver.OnCompleted(); + return Task.FromResult(Empty.DefaultInstance); } - public void UnaryCall(SimpleRequest request, IObserver responseObserver) + public Task UnaryCall(SimpleRequest request) { var response = SimpleResponse.CreateBuilder() .SetPayload(CreateZerosPayload(request.ResponseSize)).Build(); - // TODO: check we support ReponseType - responseObserver.OnNext(response); - responseObserver.OnCompleted(); + return Task.FromResult(response); } - public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver responseObserver) + public async Task StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter responseStream) { foreach (var responseParam in request.ResponseParametersList) { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - responseObserver.OnNext(response); + await responseStream.Write(response); } - responseObserver.OnCompleted(); } - public IObserver StreamingInputCall(IObserver responseObserver) + public async Task StreamingInputCall(IAsyncStreamReader requestStream) { - var recorder = new RecordingObserver(); - Task.Run(() => + int sum = 0; + await requestStream.ForEach(async request => { - int sum = 0; - foreach (var req in recorder.ToList().Result) - { - sum += req.Payload.Body.Length; - } - var response = StreamingInputCallResponse.CreateBuilder() - .SetAggregatedPayloadSize(sum).Build(); - responseObserver.OnNext(response); - responseObserver.OnCompleted(); + sum += request.Payload.Body.Length; }); - return recorder; + return StreamingInputCallResponse.CreateBuilder().SetAggregatedPayloadSize(sum).Build(); } - public IObserver FullDuplexCall(IObserver responseObserver) + public async Task FullDuplexCall(IAsyncStreamReader requestStream, IServerStreamWriter responseStream) { - return new FullDuplexObserver(responseObserver); - } - - public IObserver HalfDuplexCall(IObserver responseObserver) - { - throw new NotImplementedException(); - } - - private class FullDuplexObserver : IObserver - { - readonly IObserver responseObserver; - - public FullDuplexObserver(IObserver responseObserver) - { - this.responseObserver = responseObserver; - } - - public void OnCompleted() + await requestStream.ForEach(async request => { - responseObserver.OnCompleted(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnNext(StreamingOutputCallRequest value) - { - foreach (var responseParam in value.ResponseParametersList) + foreach (var responseParam in request.ResponseParametersList) { var response = StreamingOutputCallResponse.CreateBuilder() .SetPayload(CreateZerosPayload(responseParam.Size)).Build(); - responseObserver.OnNext(response); + await responseStream.Write(response); } - } + }); + } + + public async Task HalfDuplexCall(IAsyncStreamReader requestStream, IServerStreamWriter responseStream) + { + throw new NotImplementedException(); } private static Payload CreateZerosPayload(int size) -- cgit v1.2.3