aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.IntegrationTesting
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-04-30 11:56:46 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-05-04 09:21:37 -0700
commita5272b6adc5fb7e8c71b7216b0f5e690980a79b2 (patch)
tree41025f6975bc0058b8bce500d04c01d6546f882c /src/csharp/Grpc.IntegrationTesting
parent520ecb18f5b400b9c4e44a56acacc098cfaa7f77 (diff)
A new version C# API based on async/await
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting')
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs119
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs34
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs77
4 files changed, 101 insertions, 131 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 1fbae374b1..573ab30452 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -34,6 +34,7 @@
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
+using System.Threading.Tasks;
using Google.ProtocolBuffers;
using grpc.testing;
@@ -199,113 +200,115 @@ namespace Grpc.IntegrationTesting
public static void RunClientStreaming(TestServiceGrpc.ITestServiceClient client)
{
- Console.WriteLine("running client_streaming");
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running client_streaming");
- var bodySizes = new List<int> { 27182, 8, 1828, 45904 };
+ var bodySizes = new List<int> { 27182, 8, 1828, 45904 }.ConvertAll((size) => StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build());
- var context = client.StreamingInputCall();
- foreach (var size in bodySizes)
- {
- context.Inputs.OnNext(
- StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build());
- }
- context.Inputs.OnCompleted();
+ var call = client.StreamingInputCall();
+ await call.RequestStream.WriteAll(bodySizes);
- var response = context.Task.Result;
- Assert.AreEqual(74922, response.AggregatedPayloadSize);
- Console.WriteLine("Passed!");
+ var response = await call.Result;
+ Assert.AreEqual(74922, response.AggregatedPayloadSize);
+ Console.WriteLine("Passed!");
+ }).Wait();
}
public static void RunServerStreaming(TestServiceGrpc.ITestServiceClient client)
{
- Console.WriteLine("running server_streaming");
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running server_streaming");
- var bodySizes = new List<int> { 31415, 9, 2653, 58979 };
+ var bodySizes = new List<int> { 31415, 9, 2653, 58979 };
- var request = StreamingOutputCallRequest.CreateBuilder()
+ var request = StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddRangeResponseParameters(bodySizes.ConvertAll(
- (size) => ResponseParameters.CreateBuilder().SetSize(size).Build()))
+ (size) => ResponseParameters.CreateBuilder().SetSize(size).Build()))
.Build();
- var recorder = new RecordingObserver<StreamingOutputCallResponse>();
- client.StreamingOutputCall(request, recorder);
+ var call = client.StreamingOutputCall(request);
- var responseList = recorder.ToList().Result;
-
- foreach (var res in responseList)
- {
- Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
- }
- CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length));
- Console.WriteLine("Passed!");
+ var responseList = await call.ResponseStream.ToList();
+ foreach (var res in responseList)
+ {
+ Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
+ }
+ CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length));
+ Console.WriteLine("Passed!");
+ }).Wait();
}
public static void RunPingPong(TestServiceGrpc.ITestServiceClient client)
{
- Console.WriteLine("running ping_pong");
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running ping_pong");
- var recorder = new RecordingQueue<StreamingOutputCallResponse>();
- var inputs = client.FullDuplexCall(recorder);
+ var call = client.FullDuplexCall();
- StreamingOutputCallResponse response;
+ StreamingOutputCallResponse response;
- inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
- response = recorder.Queue.Take();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(31415, response.Payload.Body.Length);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(31415, response.Payload.Body.Length);
- inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
.SetPayload(CreateZerosPayload(8)).Build());
- response = recorder.Queue.Take();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(9, response.Payload.Body.Length);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(9, response.Payload.Body.Length);
- inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653))
.SetPayload(CreateZerosPayload(1828)).Build());
- response = recorder.Queue.Take();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(2653, response.Payload.Body.Length);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(2653, response.Payload.Body.Length);
- inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
.SetPayload(CreateZerosPayload(45904)).Build());
- response = recorder.Queue.Take();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(58979, response.Payload.Body.Length);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(58979, response.Payload.Body.Length);
- inputs.OnCompleted();
+ await call.RequestStream.Close();
- recorder.Finished.Wait();
- Assert.AreEqual(0, recorder.Queue.Count);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(null, response);
- Console.WriteLine("Passed!");
+ Console.WriteLine("Passed!");
+ }).Wait();
}
public static void RunEmptyStream(TestServiceGrpc.ITestServiceClient client)
{
- Console.WriteLine("running empty_stream");
-
- var recorder = new RecordingObserver<StreamingOutputCallResponse>();
- var inputs = client.FullDuplexCall(recorder);
- inputs.OnCompleted();
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running empty_stream");
+ var call = client.FullDuplexCall();
+ await call.Close();
- var responseList = recorder.ToList().Result;
- Assert.AreEqual(0, responseList.Count);
+ var responseList = await call.ResponseStream.ToList();
+ Assert.AreEqual(0, responseList.Count);
- Console.WriteLine("Passed!");
+ Console.WriteLine("Passed!");
+ }).Wait();
}
public static void RunServiceAccountCreds(TestServiceGrpc.ITestServiceClient client)
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index 1e76d3df21..e929b76b5e 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -87,7 +87,7 @@ namespace Grpc.IntegrationTesting
[Test]
public void LargeUnary()
{
- InteropClient.RunEmptyUnary(client);
+ InteropClient.RunLargeUnary(client);
}
[Test]
diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs
index f63e0361a4..d1f8aa12c7 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestServiceGrpc.cs
@@ -100,13 +100,13 @@ namespace grpc.testing
Task<SimpleResponse> UnaryCallAsync(SimpleRequest request, CancellationToken token = default(CancellationToken));
- void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
+ AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken));
- ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
+ AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken));
- IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
+ AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken));
- IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken));
+ AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken));
}
public class TestServiceClientStub : AbstractStub<TestServiceClientStub, StubConfiguration>, ITestServiceClient
@@ -143,45 +143,45 @@ namespace grpc.testing
return Calls.AsyncUnaryCall(call, request, token);
}
- public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
+ public AsyncServerStreamingCall<StreamingOutputCallResponse> StreamingOutputCall(StreamingOutputCallRequest request, CancellationToken token = default(CancellationToken))
{
var call = CreateCall(ServiceName, StreamingOutputCallMethod);
- Calls.AsyncServerStreamingCall(call, request, responseObserver, token);
+ return Calls.AsyncServerStreamingCall(call, request, token);
}
- public ClientStreamingAsyncResult<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
+ public AsyncClientStreamingCall<StreamingInputCallRequest, StreamingInputCallResponse> StreamingInputCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(ServiceName, StreamingInputCallMethod);
return Calls.AsyncClientStreamingCall(call, token);
}
- public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> FullDuplexCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(ServiceName, FullDuplexCallMethod);
- return Calls.DuplexStreamingCall(call, responseObserver, token);
+ return Calls.AsyncDuplexStreamingCall(call, token);
}
- public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver, CancellationToken token = default(CancellationToken))
+ public AsyncDuplexStreamingCall<StreamingOutputCallRequest, StreamingOutputCallResponse> HalfDuplexCall(CancellationToken token = default(CancellationToken))
{
var call = CreateCall(ServiceName, HalfDuplexCallMethod);
- return Calls.DuplexStreamingCall(call, responseObserver, token);
+ return Calls.AsyncDuplexStreamingCall(call, token);
}
}
// server-side interface
public interface ITestService
{
- void EmptyCall(Empty request, IObserver<Empty> responseObserver);
+ Task<Empty> EmptyCall(Empty request);
- void UnaryCall(SimpleRequest request, IObserver<SimpleResponse> responseObserver);
+ Task<SimpleResponse> UnaryCall(SimpleRequest request);
- void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver);
+ Task StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
- IObserver<StreamingInputCallRequest> StreamingInputCall(IObserver<StreamingInputCallResponse> responseObserver);
+ Task<StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<StreamingInputCallRequest> requestStream);
- IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver);
+ Task FullDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
- IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver);
+ Task HalfDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream);
}
public static ServerServiceDefinition BindService(ITestService serviceImpl)
diff --git a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
index 661b31b0ee..8b0cf3a2d0 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestServiceImpl.cs
@@ -36,6 +36,7 @@ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Google.ProtocolBuffers;
+using Grpc.Core;
using Grpc.Core.Utils;
namespace grpc.testing
@@ -45,88 +46,54 @@ namespace grpc.testing
/// </summary>
public class TestServiceImpl : TestServiceGrpc.ITestService
{
- public void EmptyCall(Empty request, IObserver<Empty> responseObserver)
+ public Task<Empty> EmptyCall(Empty request)
{
- responseObserver.OnNext(Empty.DefaultInstance);
- responseObserver.OnCompleted();
+ return Task.FromResult(Empty.DefaultInstance);
}
- public void UnaryCall(SimpleRequest request, IObserver<SimpleResponse> responseObserver)
+ public Task<SimpleResponse> UnaryCall(SimpleRequest request)
{
var response = SimpleResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(request.ResponseSize)).Build();
- // TODO: check we support ReponseType
- responseObserver.OnNext(response);
- responseObserver.OnCompleted();
+ return Task.FromResult(response);
}
- public void StreamingOutputCall(StreamingOutputCallRequest request, IObserver<StreamingOutputCallResponse> responseObserver)
+ public async Task StreamingOutputCall(StreamingOutputCallRequest request, IServerStreamWriter<StreamingOutputCallResponse> responseStream)
{
foreach (var responseParam in request.ResponseParametersList)
{
var response = StreamingOutputCallResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(responseParam.Size)).Build();
- responseObserver.OnNext(response);
+ await responseStream.Write(response);
}
- responseObserver.OnCompleted();
}
- public IObserver<StreamingInputCallRequest> StreamingInputCall(IObserver<StreamingInputCallResponse> responseObserver)
+ public async Task<StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<StreamingInputCallRequest> requestStream)
{
- var recorder = new RecordingObserver<StreamingInputCallRequest>();
- Task.Run(() =>
+ int sum = 0;
+ await requestStream.ForEach(async request =>
{
- int sum = 0;
- foreach (var req in recorder.ToList().Result)
- {
- sum += req.Payload.Body.Length;
- }
- var response = StreamingInputCallResponse.CreateBuilder()
- .SetAggregatedPayloadSize(sum).Build();
- responseObserver.OnNext(response);
- responseObserver.OnCompleted();
+ sum += request.Payload.Body.Length;
});
- return recorder;
+ return StreamingInputCallResponse.CreateBuilder().SetAggregatedPayloadSize(sum).Build();
}
- public IObserver<StreamingOutputCallRequest> FullDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver)
+ public async Task FullDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream)
{
- return new FullDuplexObserver(responseObserver);
- }
-
- public IObserver<StreamingOutputCallRequest> HalfDuplexCall(IObserver<StreamingOutputCallResponse> responseObserver)
- {
- throw new NotImplementedException();
- }
-
- private class FullDuplexObserver : IObserver<StreamingOutputCallRequest>
- {
- readonly IObserver<StreamingOutputCallResponse> responseObserver;
-
- public FullDuplexObserver(IObserver<StreamingOutputCallResponse> responseObserver)
- {
- this.responseObserver = responseObserver;
- }
-
- public void OnCompleted()
+ await requestStream.ForEach(async request =>
{
- responseObserver.OnCompleted();
- }
-
- public void OnError(Exception error)
- {
- throw new NotImplementedException();
- }
-
- public void OnNext(StreamingOutputCallRequest value)
- {
- foreach (var responseParam in value.ResponseParametersList)
+ foreach (var responseParam in request.ResponseParametersList)
{
var response = StreamingOutputCallResponse.CreateBuilder()
.SetPayload(CreateZerosPayload(responseParam.Size)).Build();
- responseObserver.OnNext(response);
+ await responseStream.Write(response);
}
- }
+ });
+ }
+
+ public async Task HalfDuplexCall(IAsyncStreamReader<StreamingOutputCallRequest> requestStream, IServerStreamWriter<StreamingOutputCallResponse> responseStream)
+ {
+ throw new NotImplementedException();
}
private static Payload CreateZerosPayload(int size)