aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2016-03-21 16:25:59 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2016-03-25 16:28:16 -0700
commit253769e92d5ff1883c1623fd0ee130ae4ce4b380 (patch)
tree75bb586debed33f548e667727b42a105ca1f694c /src
parente45ca5f59286ef8a3b617e5f9c49f07f9fcfeefd (diff)
add ASYNC_GENERIC_SERVER support for C#
Diffstat (limited to 'src')
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs21
-rw-r--r--src/csharp/Grpc.IntegrationTesting/GenericService.cs71
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj1
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ServerRunners.cs46
4 files changed, 116 insertions, 23 deletions
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index a749f4a8a3..e6dc2321c4 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -94,7 +94,7 @@ namespace Grpc.IntegrationTesting
}
var channel = new Channel(target, credentials, channelOptions);
- return new SimpleClientRunner(channel,
+ return new ClientRunnerImpl(channel,
config.ClientType,
config.RpcType,
config.PayloadConfig,
@@ -102,23 +102,10 @@ namespace Grpc.IntegrationTesting
}
}
- /// <summary>
- /// Simple protobuf client.
- /// </summary>
- public class SimpleClientRunner : IClientRunner
+ public class ClientRunnerImpl : IClientRunner
{
const double SecondsToNanos = 1e9;
- readonly static Marshaller<byte[]> ByteArrayMarshaller = new Marshaller<byte[]>((b) => b, (b) => b);
-
- readonly static Method<byte[], byte[]> StreamingCallMethod = new Method<byte[], byte[]>(
- MethodType.DuplexStreaming,
- "grpc.testing.BenchmarkService",
- "StreamingCall",
- ByteArrayMarshaller,
- ByteArrayMarshaller
- );
-
readonly Channel channel;
readonly ClientType clientType;
readonly RpcType rpcType;
@@ -130,7 +117,7 @@ namespace Grpc.IntegrationTesting
readonly CancellationTokenSource stoppedCts;
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
- public SimpleClientRunner(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams)
+ public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams)
{
this.channel = GrpcPreconditions.CheckNotNull(channel);
this.clientType = clientType;
@@ -228,7 +215,7 @@ namespace Grpc.IntegrationTesting
var request = CreateByteBufferRequest();
var stopwatch = new Stopwatch();
- var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, StreamingCallMethod, new CallOptions());
+ var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
using (var call = Calls.AsyncDuplexStreamingCall(callDetails))
{
diff --git a/src/csharp/Grpc.IntegrationTesting/GenericService.cs b/src/csharp/Grpc.IntegrationTesting/GenericService.cs
new file mode 100644
index 0000000000..c6128264ac
--- /dev/null
+++ b/src/csharp/Grpc.IntegrationTesting/GenericService.cs
@@ -0,0 +1,71 @@
+#region Copyright notice and license
+
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Text.RegularExpressions;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Grpc.Core;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+using Grpc.Testing;
+
+namespace Grpc.IntegrationTesting
+{
+ /// <summary>
+ /// Utility methods for defining and calling a service that doesn't use protobufs
+ /// for serialization/deserialization.
+ /// </summary>
+ public static class GenericService
+ {
+ readonly static Marshaller<byte[]> ByteArrayMarshaller = new Marshaller<byte[]>((b) => b, (b) => b);
+
+ public readonly static Method<byte[], byte[]> StreamingCallMethod = new Method<byte[], byte[]>(
+ MethodType.DuplexStreaming,
+ "grpc.testing.BenchmarkService",
+ "StreamingCall",
+ ByteArrayMarshaller,
+ ByteArrayMarshaller
+ );
+
+ public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod<byte[], byte[]> handler)
+ {
+ return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName)
+ .AddMethod(StreamingCallMethod, handler).Build();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 372991374e..4c049944ea 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -120,6 +120,7 @@
<Compile Include="WorkerServiceImpl.cs" />
<Compile Include="QpsWorker.cs" />
<Compile Include="WallClockStopwatch.cs" />
+ <Compile Include="GenericService.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
index 516436ac5a..c326378cfa 100644
--- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
@@ -61,7 +61,6 @@ namespace Grpc.IntegrationTesting
public static IServerRunner CreateStarted(ServerConfig config)
{
Logger.Debug("ServerConfig: {0}", config);
- GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER, "Only ASYNC_SERVER supported for C# QpsWorker");
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure;
if (config.AsyncServerThreads != 0)
@@ -77,17 +76,53 @@ namespace Grpc.IntegrationTesting
Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value");
}
- GrpcPreconditions.CheckArgument(config.PayloadConfig == null,
- "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server.");
+ ServerServiceDefinition service = null;
+ if (config.ServerType == ServerType.ASYNC_SERVER)
+ {
+ GrpcPreconditions.CheckArgument(config.PayloadConfig == null,
+ "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server.");
+ service = BenchmarkService.BindService(new BenchmarkServiceImpl());
+ }
+ else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER)
+ {
+ var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize);
+ service = GenericService.BindHandler(genericService.StreamingCall);
+ }
+ else
+ {
+ throw new ArgumentException("Unsupported ServerType");
+ }
+
var server = new Server
{
- Services = { BenchmarkService.BindService(new BenchmarkServiceImpl()) },
+ Services = { service },
Ports = { new ServerPort("[::]", config.Port, credentials) }
};
server.Start();
return new ServerRunnerImpl(server);
}
+
+ private class GenericServiceImpl
+ {
+ readonly byte[] response;
+
+ public GenericServiceImpl(int responseSize)
+ {
+ this.response = new byte[responseSize];
+ }
+
+ /// <summary>
+ /// Generic streaming call handler.
+ /// </summary>
+ public async Task StreamingCall(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext context)
+ {
+ await requestStream.ForEachAsync(async request =>
+ {
+ await responseStream.WriteAsync(response);
+ });
+ }
+ }
}
/// <summary>
@@ -136,6 +171,5 @@ namespace Grpc.IntegrationTesting
{
return server.ShutdownAsync();
}
- }
-
+ }
}