aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.IntegrationTesting/InteropClient.cs')
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs186
1 files changed, 128 insertions, 58 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 1fbae374b1..a433659a08 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -34,6 +34,8 @@
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
+using System.Threading;
+using System.Threading.Tasks;
using Google.ProtocolBuffers;
using grpc.testing;
@@ -165,6 +167,12 @@ namespace Grpc.IntegrationTesting
case "compute_engine_creds":
RunComputeEngineCreds(client);
break;
+ case "cancel_after_begin":
+ RunCancelAfterBegin(client);
+ break;
+ case "cancel_after_first_response":
+ RunCancelAfterFirstResponse(client);
+ break;
case "benchmark_empty_unary":
RunBenchmarkEmptyUnary(client);
break;
@@ -199,113 +207,115 @@ namespace Grpc.IntegrationTesting
public static void RunClientStreaming(TestServiceGrpc.ITestServiceClient client)
{
- Console.WriteLine("running client_streaming");
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running client_streaming");
- var bodySizes = new List<int> { 27182, 8, 1828, 45904 };
+ var bodySizes = new List<int> { 27182, 8, 1828, 45904 }.ConvertAll((size) => StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build());
- var context = client.StreamingInputCall();
- foreach (var size in bodySizes)
- {
- context.Inputs.OnNext(
- StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build());
- }
- context.Inputs.OnCompleted();
+ var call = client.StreamingInputCall();
+ await call.RequestStream.WriteAll(bodySizes);
- var response = context.Task.Result;
- Assert.AreEqual(74922, response.AggregatedPayloadSize);
- Console.WriteLine("Passed!");
+ var response = await call.Result;
+ Assert.AreEqual(74922, response.AggregatedPayloadSize);
+ Console.WriteLine("Passed!");
+ }).Wait();
}
public static void RunServerStreaming(TestServiceGrpc.ITestServiceClient client)
{
- Console.WriteLine("running server_streaming");
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running server_streaming");
- var bodySizes = new List<int> { 31415, 9, 2653, 58979 };
+ var bodySizes = new List<int> { 31415, 9, 2653, 58979 };
- var request = StreamingOutputCallRequest.CreateBuilder()
+ var request = StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddRangeResponseParameters(bodySizes.ConvertAll(
- (size) => ResponseParameters.CreateBuilder().SetSize(size).Build()))
+ (size) => ResponseParameters.CreateBuilder().SetSize(size).Build()))
.Build();
- var recorder = new RecordingObserver<StreamingOutputCallResponse>();
- client.StreamingOutputCall(request, recorder);
-
- var responseList = recorder.ToList().Result;
+ var call = client.StreamingOutputCall(request);
- foreach (var res in responseList)
- {
- Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
- }
- CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length));
- Console.WriteLine("Passed!");
+ var responseList = await call.ResponseStream.ToList();
+ foreach (var res in responseList)
+ {
+ Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
+ }
+ CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length));
+ Console.WriteLine("Passed!");
+ }).Wait();
}
public static void RunPingPong(TestServiceGrpc.ITestServiceClient client)
{
- Console.WriteLine("running ping_pong");
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running ping_pong");
- var recorder = new RecordingQueue<StreamingOutputCallResponse>();
- var inputs = client.FullDuplexCall(recorder);
+ var call = client.FullDuplexCall();
- StreamingOutputCallResponse response;
+ StreamingOutputCallResponse response;
- inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
- response = recorder.Queue.Take();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(31415, response.Payload.Body.Length);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(31415, response.Payload.Body.Length);
- inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
.SetPayload(CreateZerosPayload(8)).Build());
- response = recorder.Queue.Take();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(9, response.Payload.Body.Length);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(9, response.Payload.Body.Length);
- inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653))
.SetPayload(CreateZerosPayload(1828)).Build());
- response = recorder.Queue.Take();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(2653, response.Payload.Body.Length);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(2653, response.Payload.Body.Length);
- inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
.SetPayload(CreateZerosPayload(45904)).Build());
- response = recorder.Queue.Take();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(58979, response.Payload.Body.Length);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(58979, response.Payload.Body.Length);
- inputs.OnCompleted();
+ await call.RequestStream.Close();
- recorder.Finished.Wait();
- Assert.AreEqual(0, recorder.Queue.Count);
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(null, response);
- Console.WriteLine("Passed!");
+ Console.WriteLine("Passed!");
+ }).Wait();
}
public static void RunEmptyStream(TestServiceGrpc.ITestServiceClient client)
{
- Console.WriteLine("running empty_stream");
-
- var recorder = new RecordingObserver<StreamingOutputCallResponse>();
- var inputs = client.FullDuplexCall(recorder);
- inputs.OnCompleted();
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running empty_stream");
+ var call = client.FullDuplexCall();
+ await call.Close();
- var responseList = recorder.ToList().Result;
- Assert.AreEqual(0, responseList.Count);
+ var responseList = await call.ResponseStream.ToList();
+ Assert.AreEqual(0, responseList.Count);
- Console.WriteLine("Passed!");
+ Console.WriteLine("Passed!");
+ }).Wait();
}
public static void RunServiceAccountCreds(TestServiceGrpc.ITestServiceClient client)
@@ -348,6 +358,66 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!");
}
+ public static void RunCancelAfterBegin(TestServiceGrpc.ITestServiceClient client)
+ {
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running cancel_after_begin");
+
+ var cts = new CancellationTokenSource();
+ var call = client.StreamingInputCall(cts.Token);
+ // TODO(jtattermusch): we need this to ensure call has been initiated once we cancel it.
+ await Task.Delay(1000);
+ cts.Cancel();
+
+ try
+ {
+ var response = await call.Result;
+ Assert.Fail();
+ }
+ catch (RpcException e)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
+ }
+ Console.WriteLine("Passed!");
+ }).Wait();
+ }
+
+ public static void RunCancelAfterFirstResponse(TestServiceGrpc.ITestServiceClient client)
+ {
+ Task.Run(async () =>
+ {
+ Console.WriteLine("running cancel_after_first_response");
+
+ var cts = new CancellationTokenSource();
+ var call = client.FullDuplexCall(cts.Token);
+
+ StreamingOutputCallResponse response;
+
+ await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
+ .SetPayload(CreateZerosPayload(27182)).Build());
+
+ response = await call.ResponseStream.ReadNext();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(31415, response.Payload.Body.Length);
+
+ cts.Cancel();
+
+ try
+ {
+ response = await call.ResponseStream.ReadNext();
+ Assert.Fail();
+ }
+ catch (RpcException e)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, e.Status.StatusCode);
+ }
+ Console.WriteLine("Passed!");
+ }).Wait();
+ }
+
// This is not an official interop test, but it's useful.
public static void RunBenchmarkEmptyUnary(TestServiceGrpc.ITestServiceClient client)
{