aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/csharp/Grpc.sln2
-rw-r--r--src/csharp/GrpcCore/GrpcCore.csproj1
-rw-r--r--src/csharp/GrpcCore/Utils/RecordingQueue.cs45
-rw-r--r--src/csharp/InteropClient/InteropClient.cs134
4 files changed, 179 insertions, 3 deletions
diff --git a/src/csharp/Grpc.sln b/src/csharp/Grpc.sln
index 2fd10cb94a..a7b2c9b580 100644
--- a/src/csharp/Grpc.sln
+++ b/src/csharp/Grpc.sln
@@ -45,6 +45,6 @@ Global
{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(MonoDevelopProperties) = preSolution
- StartupItem = GrpcApi\GrpcApi.csproj
+ StartupItem = InteropClient\InteropClient.csproj
EndGlobalSection
EndGlobal
diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj
index fbfe50e4d8..95df890917 100644
--- a/src/csharp/GrpcCore/GrpcCore.csproj
+++ b/src/csharp/GrpcCore/GrpcCore.csproj
@@ -62,6 +62,7 @@
<Compile Include="ServerServiceDefinition.cs" />
<Compile Include="Utils\RecordingObserver.cs" />
<Compile Include="Utils\PortPicker.cs" />
+ <Compile Include="Utils\RecordingQueue.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/GrpcCore/Utils/RecordingQueue.cs b/src/csharp/GrpcCore/Utils/RecordingQueue.cs
new file mode 100644
index 0000000000..8e2f8a496d
--- /dev/null
+++ b/src/csharp/GrpcCore/Utils/RecordingQueue.cs
@@ -0,0 +1,45 @@
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace Google.GRPC.Core.Utils
+{
+ public class RecordingQueue<T> : IObserver<T>
+ {
+ readonly BlockingCollection<T> queue = new BlockingCollection<T>();
+ TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
+
+ public void OnCompleted()
+ {
+ tcs.SetResult(null);
+ }
+
+ public void OnError(Exception error)
+ {
+ tcs.SetException(error);
+ }
+
+ public void OnNext(T value)
+ {
+ queue.Add(value);
+ }
+
+ public BlockingCollection<T> Queue
+ {
+ get
+ {
+ return queue;
+ }
+ }
+
+ public Task Finished
+ {
+ get
+ {
+ return tcs.Task;
+ }
+ }
+ }
+}
+
diff --git a/src/csharp/InteropClient/InteropClient.cs b/src/csharp/InteropClient/InteropClient.cs
index 62091c5fdc..40c4781304 100644
--- a/src/csharp/InteropClient/InteropClient.cs
+++ b/src/csharp/InteropClient/InteropClient.cs
@@ -1,7 +1,9 @@
using System;
+using System.Collections.Generic;
using NUnit.Framework;
using System.Text.RegularExpressions;
using Google.GRPC.Core;
+using Google.GRPC.Core.Utils;
using Google.ProtocolBuffers;
using grpc.testing;
@@ -78,6 +80,18 @@ namespace InteropClient
case "large_unary":
RunLargeUnary(client);
break;
+ case "client_streaming":
+ RunClientStreaming(client);
+ break;
+ case "server_streaming":
+ RunServerStreaming(client);
+ break;
+ case "ping_pong":
+ RunPingPong(client);
+ break;
+ case "empty_stream":
+ RunEmptyStream(client);
+ break;
default:
throw new ArgumentException("Unknown test case " + testCase);
}
@@ -88,6 +102,7 @@ namespace InteropClient
Console.WriteLine("running empty_unary");
var response = client.EmptyCall(Empty.DefaultInstance);
Assert.IsNotNull(response);
+ Console.WriteLine("Passed!");
}
private void RunLargeUnary(TestServiceGrpc.ITestServiceClient client)
@@ -96,14 +111,129 @@ namespace InteropClient
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.SetResponseSize(314159)
- .SetPayload(Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[271828])))
+ .SetPayload(CreateZerosPayload(271828))
.Build();
var response = client.UnaryCall(request);
Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
Assert.AreEqual(314159, response.Payload.Body.Length);
- // TODO: assert that the response is all zeros...
+ Console.WriteLine("Passed!");
+ }
+
+ private void RunClientStreaming(TestServiceGrpc.ITestServiceClient client)
+ {
+ Console.WriteLine("running client_streaming");
+
+ var bodySizes = new List<int>{27182, 8, 1828, 45904};
+
+ var context = client.StreamingInputCall();
+ foreach (var size in bodySizes)
+ {
+ context.Inputs.OnNext(
+ StreamingInputCallRequest.CreateBuilder().SetPayload(CreateZerosPayload(size)).Build());
+ }
+ context.Inputs.OnCompleted();
+
+ var response = context.Task.Result;
+ Assert.AreEqual(74922, response.AggregatedPayloadSize);
+ Console.WriteLine("Passed!");
+ }
+
+ private void RunServerStreaming(TestServiceGrpc.ITestServiceClient client)
+ {
+ Console.WriteLine("running server_streaming");
+
+ var bodySizes = new List<int>{31415, 9, 2653, 58979};
+
+ var request = StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddRangeResponseParameters(bodySizes.ConvertAll(
+ (size) => ResponseParameters.CreateBuilder().SetSize(size).Build()))
+ .Build();
+
+ var recorder = new RecordingObserver<StreamingOutputCallResponse>();
+ client.StreamingOutputCall(request, recorder);
+
+ var responseList = recorder.ToList().Result;
+
+ foreach (var res in responseList)
+ {
+ Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type);
+ }
+ CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length));
+ Console.WriteLine("Passed!");
+ }
+
+ private void RunPingPong(TestServiceGrpc.ITestServiceClient client)
+ {
+ Console.WriteLine("running ping_pong");
+
+ var recorder = new RecordingQueue<StreamingOutputCallResponse>();
+ var inputs = client.FullDuplexCall(recorder);
+
+ StreamingOutputCallResponse response;
+
+ inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
+ .SetPayload(CreateZerosPayload(27182)).Build());
+
+ response = recorder.Queue.Take();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(31415, response.Payload.Body.Length);
+
+ inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
+ .SetPayload(CreateZerosPayload(8)).Build());
+
+ response = recorder.Queue.Take();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(9, response.Payload.Body.Length);
+
+ inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2635))
+ .SetPayload(CreateZerosPayload(1828)).Build());
+
+ response = recorder.Queue.Take();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(2653, response.Payload.Body.Length);
+
+
+ inputs.OnNext(StreamingOutputCallRequest.CreateBuilder()
+ .SetResponseType(PayloadType.COMPRESSABLE)
+ .AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
+ .SetPayload(CreateZerosPayload(45904)).Build());
+
+ response = recorder.Queue.Take();
+ Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
+ Assert.AreEqual(58979, response.Payload.Body.Length);
+
+ recorder.Finished.Wait();
+ Assert.AreEqual(0, recorder.Queue.Count);
+
+ Console.WriteLine("Passed!");
+ }
+
+ private void RunEmptyStream(TestServiceGrpc.ITestServiceClient client)
+ {
+ Console.WriteLine("running empty_stream");
+
+ var recorder = new RecordingObserver<StreamingOutputCallResponse>();
+ var inputs = client.FullDuplexCall(recorder);
+ inputs.OnCompleted();
+
+ var responseList = recorder.ToList().Result;
+ Assert.AreEqual(0, responseList.Count);
+
+ Console.WriteLine("Passed!");
+ }
+
+
+ private Payload CreateZerosPayload(int size) {
+ return Payload.CreateBuilder().SetBody(ByteString.CopyFrom(new byte[size])).Build();
}
private static ClientOptions ParseArguments(string[] args)