aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-05 10:56:49 -0800
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-05 11:10:06 -0800
commit8ce5e8bbccc4b2d0e7e3b26fe857c105ba68943e (patch)
tree9cb75e486ccaaa3d6047c03a21193d2ebfb26875
parent6b9afb153a82c921c7e80365a4e129c462c0ebad (diff)
Improved the server implementation to be able to register call handlers, also some refactoring
-rw-r--r--src/csharp/GrpcCore/Call.cs18
-rw-r--r--src/csharp/GrpcCore/GrpcCore.csproj5
-rw-r--r--src/csharp/GrpcCore/IMarshaller.cs31
-rw-r--r--src/csharp/GrpcCore/Internal/AsyncCall.cs8
-rw-r--r--src/csharp/GrpcCore/Internal/ServerSafeHandle.cs9
-rw-r--r--src/csharp/GrpcCore/Internal/ServerWritingObserver.cs34
-rw-r--r--src/csharp/GrpcCore/Internal/StreamingInputObserver.cs2
-rw-r--r--src/csharp/GrpcCore/Method.cs64
-rw-r--r--src/csharp/GrpcCore/Server.cs89
-rw-r--r--src/csharp/GrpcCore/ServerCallHandler.cs93
-rw-r--r--src/csharp/GrpcCore/ServerCalls.cs25
-rw-r--r--src/csharp/GrpcCoreTests/ClientServerTest.cs39
-rw-r--r--src/csharp/GrpcCoreTests/ServerTest.cs2
13 files changed, 361 insertions, 58 deletions
diff --git a/src/csharp/GrpcCore/Call.cs b/src/csharp/GrpcCore/Call.cs
index bf257e5d59..735d1c544f 100644
--- a/src/csharp/GrpcCore/Call.cs
+++ b/src/csharp/GrpcCore/Call.cs
@@ -8,7 +8,6 @@ namespace Google.GRPC.Core
readonly string methodName;
readonly Func<TRequest, byte[]> requestSerializer;
readonly Func<byte[], TResponse> responseDeserializer;
- readonly TimeSpan timeout;
readonly Channel channel;
// TODO: channel param should be removed in the future.
@@ -20,24 +19,23 @@ namespace Google.GRPC.Core
this.methodName = methodName;
this.requestSerializer = requestSerializer;
this.responseDeserializer = responseDeserializer;
- this.timeout = timeout;
this.channel = channel;
}
-
- public Channel Channel
+ // TODO: channel param should be removed in the future.
+ public Call(Method<TRequest, TResponse> method, Channel channel)
{
- get
- {
- return this.channel;
- }
+ this.methodName = method.Name;
+ this.requestSerializer = method.RequestMarshaller.Serialize;
+ this.responseDeserializer = method.ResponseMarshaller.Deserialize;
+ this.channel = channel;
}
- public TimeSpan Timeout
+ public Channel Channel
{
get
{
- return this.timeout;
+ return this.channel;
}
}
diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj
index f0c84e78ea..2ad0f9154c 100644
--- a/src/csharp/GrpcCore/GrpcCore.csproj
+++ b/src/csharp/GrpcCore/GrpcCore.csproj
@@ -54,6 +54,11 @@
<Compile Include="Internal\AsyncCall.cs" />
<Compile Include="Internal\ServerSafeHandle.cs" />
<Compile Include="Internal\StreamingInputObserver.cs" />
+ <Compile Include="Method.cs" />
+ <Compile Include="IMarshaller.cs" />
+ <Compile Include="ServerCalls.cs" />
+ <Compile Include="ServerCallHandler.cs" />
+ <Compile Include="Internal\ServerWritingObserver.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/GrpcCore/IMarshaller.cs b/src/csharp/GrpcCore/IMarshaller.cs
new file mode 100644
index 0000000000..eb08d8d386
--- /dev/null
+++ b/src/csharp/GrpcCore/IMarshaller.cs
@@ -0,0 +1,31 @@
+using System;
+
+namespace Google.GRPC.Core
+{
+ /// <summary>
+ /// For serializing and deserializing messages.
+ /// </summary>
+ public interface IMarshaller<T>
+ {
+ byte[] Serialize(T value);
+
+ T Deserialize(byte[] payload);
+ }
+
+ /// <summary>
+ /// UTF-8 Marshalling for string. Useful for testing.
+ /// </summary>
+ internal class StringMarshaller : IMarshaller<string> {
+
+ public byte[] Serialize(string value)
+ {
+ return System.Text.Encoding.UTF8.GetBytes(value);
+ }
+
+ public string Deserialize(byte[] payload)
+ {
+ return System.Text.Encoding.UTF8.GetString(payload);
+ }
+ }
+}
+
diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs
index e83ca0eaa9..c38363bb2b 100644
--- a/src/csharp/GrpcCore/Internal/AsyncCall.cs
+++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs
@@ -86,6 +86,14 @@ namespace Google.GRPC.Core.Internal
return StartRead().Task;
}
+ public Task Halfclosed
+ {
+ get
+ {
+ return halfcloseTcs.Task;
+ }
+ }
+
public Task<Status> Finished
{
get
diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
index 0d38bce63e..08d4cf0192 100644
--- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
+++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs
@@ -30,8 +30,8 @@ namespace Google.GRPC.Core.Internal
[DllImport("libgrpc.so")]
static extern void grpc_server_shutdown(ServerSafeHandle server);
- [DllImport("libgrpc.so")]
- static extern void grpc_server_shutdown_and_notify(ServerSafeHandle server, IntPtr tag);
+ [DllImport("libgrpc.so", EntryPoint = "grpc_server_shutdown_and_notify")]
+ static extern void grpc_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback);
[DllImport("libgrpc.so")]
static extern void grpc_server_destroy(IntPtr server);
@@ -62,6 +62,11 @@ namespace Google.GRPC.Core.Internal
grpc_server_shutdown(this);
}
+ public void ShutdownAndNotify(EventCallbackDelegate callback)
+ {
+ grpc_server_shutdown_and_notify_CALLBACK(this, callback);
+ }
+
public GRPCCallError RequestCall(EventCallbackDelegate callback)
{
return grpc_server_request_call_old_CALLBACK(this, callback);
diff --git a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs
new file mode 100644
index 0000000000..e18eb9e9f1
--- /dev/null
+++ b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs
@@ -0,0 +1,34 @@
+using System;
+using Google.GRPC.Core.Internal;
+
+namespace Google.GRPC.Core.Internal
+{
+ internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite>
+ {
+ readonly AsyncCall<TWrite, TRead> call;
+
+ public ServerWritingObserver(AsyncCall<TWrite, TRead> call)
+ {
+ this.call = call;
+ }
+
+ public void OnCompleted()
+ {
+ // TODO: how bad is the Wait here?
+ call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
+ }
+
+ public void OnError(Exception error)
+ {
+ // TODO: handle this...
+ throw new InvalidOperationException("This should never be called.");
+ }
+
+ public void OnNext(TWrite value)
+ {
+ // TODO: how bad is the Wait here?
+ call.WriteAsync(value).Wait();
+ }
+ }
+}
+
diff --git a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs
index d483e53a2d..c5de979351 100644
--- a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs
+++ b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs
@@ -1,7 +1,7 @@
using System;
using Google.GRPC.Core.Internal;
-namespace Google.GRPC.Core
+namespace Google.GRPC.Core.Internal
{
internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite>
{
diff --git a/src/csharp/GrpcCore/Method.cs b/src/csharp/GrpcCore/Method.cs
new file mode 100644
index 0000000000..2790115695
--- /dev/null
+++ b/src/csharp/GrpcCore/Method.cs
@@ -0,0 +1,64 @@
+using System;
+
+namespace Google.GRPC.Core
+{
+ public enum MethodType
+ {
+ Unary,
+ ClientStreaming,
+ ServerStreaming,
+ DuplexStreaming
+ }
+
+ /// <summary>
+ /// A description of a service method.
+ /// </summary>
+ public class Method<TRequest, TResponse>
+ {
+ readonly MethodType type;
+ readonly string name;
+ readonly IMarshaller<TRequest> requestMarshaller;
+ readonly IMarshaller<TResponse> responseMarshaller;
+
+ public Method(MethodType type, string name, IMarshaller<TRequest> requestMarshaller, IMarshaller<TResponse> responseMarshaller)
+ {
+ this.type = type;
+ this.name = name;
+ this.requestMarshaller = requestMarshaller;
+ this.responseMarshaller = responseMarshaller;
+ }
+
+ public MethodType Type
+ {
+ get
+ {
+ return this.type;
+ }
+ }
+
+ public string Name
+ {
+ get
+ {
+ return this.name;
+ }
+ }
+
+ public IMarshaller<TRequest> RequestMarshaller
+ {
+ get
+ {
+ return this.requestMarshaller;
+ }
+ }
+
+ public IMarshaller<TResponse> ResponseMarshaller
+ {
+ get
+ {
+ return this.responseMarshaller;
+ }
+ }
+ }
+}
+
diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs
index 68da1a8300..4e9d114f85 100644
--- a/src/csharp/GrpcCore/Server.cs
+++ b/src/csharp/GrpcCore/Server.cs
@@ -1,7 +1,9 @@
using System;
using System.Runtime.InteropServices;
using System.Diagnostics;
+using System.Threading.Tasks;
using System.Collections.Concurrent;
+using System.Collections.Generic;
using Google.GRPC.Core.Internal;
namespace Google.GRPC.Core
@@ -15,10 +17,15 @@ namespace Google.GRPC.Core
// TODO: make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly EventCallbackDelegate newRpcHandler;
+ readonly EventCallbackDelegate serverShutdownHandler;
readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
+ readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
+
+ readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
+
static Server() {
GrpcEnvironment.EnsureInitialized();
}
@@ -28,8 +35,14 @@ namespace Google.GRPC.Core
// TODO: what is the tag for server shutdown?
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
this.newRpcHandler = HandleNewRpc;
+ this.serverShutdownHandler = HandleServerShutdown;
}
+ // only call before Start(), this will be in server builder in the future.
+ internal void AddCallHandler(string methodName, IServerCallHandler handler) {
+ callHandlers.Add(methodName, handler);
+ }
+ // only call before Start()
public int AddPort(string addr) {
return handle.AddPort(addr);
}
@@ -37,49 +50,57 @@ namespace Google.GRPC.Core
public void Start()
{
handle.Start();
+
+ // TODO: this basically means the server is single threaded....
+ StartHandlingRpcs();
}
- public void RunRpc()
+ /// <summary>
+ /// Requests and handles single RPC call.
+ /// </summary>
+ internal void RunRpc()
{
AllowOneRpc();
- try {
- var rpcInfo = newRpcQueue.Take();
-
- Console.WriteLine("Server received RPC " + rpcInfo.Method);
-
- AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
- (payload) => payload, (payload) => payload);
-
- asyncCall.InitializeServer(rpcInfo.Call);
+ try
+ {
+ var rpcInfo = newRpcQueue.Take();
- asyncCall.Accept(GetCompletionQueue());
+ Console.WriteLine("Server received RPC " + rpcInfo.Method);
- while(true) {
- byte[] payload = asyncCall.ReadAsync().Result;
- if (payload == null)
+ IServerCallHandler callHandler;
+ if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
{
- break;
- }
+ callHandler = new NoSuchMethodCallHandler();
+ }
+ callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue());
}
-
- asyncCall.WriteAsync(new byte[] { }).Wait();
-
- // TODO: what should be the details?
- asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
-
- asyncCall.Finished.Wait();
- } catch(Exception e) {
+ catch(Exception e)
+ {
Console.WriteLine("Exception while handling RPC: " + e);
}
}
- // TODO: implement disposal properly...
- public void Shutdown() {
- handle.Shutdown();
+ /// <summary>
+ /// Requests server shutdown and when there are no more calls being serviced,
+ /// cleans up used resources.
+ /// </summary>
+ /// <returns>The async.</returns>
+ public async Task ShutdownAsync() {
+ handle.ShutdownAndNotify(serverShutdownHandler);
+ await shutdownTcs.Task;
+ handle.Dispose();
+ }
+ public void Kill() {
+ handle.Dispose();
+ }
- //handle.Dispose();
+ private async Task StartHandlingRpcs() {
+ while (true)
+ {
+ await Task.Factory.StartNew(RunRpc);
+ }
}
private void AllowOneRpc()
@@ -100,6 +121,18 @@ namespace Google.GRPC.Core
}
}
+ private void HandleServerShutdown(IntPtr eventPtr)
+ {
+ try
+ {
+ shutdownTcs.SetResult(null);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Caught exception in a native handler: " + e);
+ }
+ }
+
private static void AssertCallOk(GRPCCallError callError)
{
Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs
new file mode 100644
index 0000000000..08d527a019
--- /dev/null
+++ b/src/csharp/GrpcCore/ServerCallHandler.cs
@@ -0,0 +1,93 @@
+using System;
+using Google.GRPC.Core.Internal;
+
+namespace Google.GRPC.Core
+{
+ internal interface IServerCallHandler
+ {
+ void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
+ }
+
+ internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ {
+ readonly Method<TRequest, TResponse> method;
+ readonly UnaryRequestServerMethod<TRequest, TResponse> handler;
+
+ public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler)
+ {
+ this.method = method;
+ this.handler = handler;
+ }
+
+ public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ {
+ var asyncCall = new AsyncCall<TResponse, TRequest>(
+ (msg) => method.ResponseMarshaller.Serialize(msg),
+ (payload) => method.RequestMarshaller.Deserialize(payload));
+
+ asyncCall.InitializeServer(call);
+ asyncCall.Accept(cq);
+
+ var request = asyncCall.ReadAsync().Result;
+
+ var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
+ handler(request, responseObserver);
+
+ asyncCall.Halfclosed.Wait();
+ // TODO: wait until writing is finished
+
+ asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
+ asyncCall.Finished.Wait();
+ }
+ }
+
+ internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ {
+ readonly Method<TRequest, TResponse> method;
+ readonly StreamingRequestServerMethod<TRequest, TResponse> handler;
+
+ public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler)
+ {
+ this.method = method;
+ this.handler = handler;
+ }
+
+ public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ {
+ var asyncCall = new AsyncCall<TResponse, TRequest>(
+ (msg) => method.ResponseMarshaller.Serialize(msg),
+ (payload) => method.RequestMarshaller.Deserialize(payload));
+
+ asyncCall.InitializeServer(call);
+ asyncCall.Accept(cq);
+
+ var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
+ var requestObserver = handler(responseObserver);
+
+ // feed the requests
+ asyncCall.StartReadingToStream(requestObserver);
+
+ asyncCall.Halfclosed.Wait();
+
+ asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
+ asyncCall.Finished.Wait();
+ }
+ }
+
+ internal class NoSuchMethodCallHandler : IServerCallHandler
+ {
+ public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ {
+ // We don't care about the payload type here.
+ AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
+ (payload) => payload, (payload) => payload);
+
+ asyncCall.InitializeServer(call);
+ asyncCall.Accept(cq);
+ asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
+
+ asyncCall.Finished.Wait();
+ }
+ }
+}
+
diff --git a/src/csharp/GrpcCore/ServerCalls.cs b/src/csharp/GrpcCore/ServerCalls.cs
new file mode 100644
index 0000000000..86c4718932
--- /dev/null
+++ b/src/csharp/GrpcCore/ServerCalls.cs
@@ -0,0 +1,25 @@
+using System;
+
+namespace Google.GRPC.Core
+{
+ // TODO: perhaps add also serverSideStreaming and clientSideStreaming
+
+ public delegate void UnaryRequestServerMethod<TRequest, TResponse> (TRequest request, IObserver<TResponse> responseObserver);
+
+ public delegate IObserver<TRequest> StreamingRequestServerMethod<TRequest, TResponse> (IObserver<TResponse> responseObserver);
+
+ internal static class ServerCalls {
+
+ public static IServerCallHandler UnaryRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler)
+ {
+ return new UnaryRequestServerCallHandler<TRequest, TResponse>(method, handler);
+ }
+
+ public static IServerCallHandler StreamingRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler)
+ {
+ return new StreamingRequestServerCallHandler<TRequest, TResponse>(method, handler);
+ }
+
+ }
+}
+
diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs
index 823ee94288..511683b003 100644
--- a/src/csharp/GrpcCoreTests/ClientServerTest.cs
+++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs
@@ -8,41 +8,48 @@ namespace Google.GRPC.Core.Tests
{
public class ClientServerTest
{
- string request = "REQUEST";
string serverAddr = "localhost:" + Utils.PickUnusedPort();
+ private Method<string, string> unaryEchoStringMethod = new Method<string, string>(
+ MethodType.Unary,
+ "/tests.Test/UnaryEchoString",
+ new StringMarshaller(),
+ new StringMarshaller());
+
[Test]
public void EmptyCall()
{
Server server = new Server();
+
+ server.AddCallHandler(unaryEchoStringMethod.Name,
+ ServerCalls.UnaryRequestCall(unaryEchoStringMethod, HandleUnaryEchoString));
+
server.AddPort(serverAddr);
server.Start();
- Task.Factory.StartNew(
- () => {
- server.RunRpc();
- }
- );
-
using (Channel channel = new Channel(serverAddr))
{
- CreateCall(channel);
- string response = Calls.BlockingUnaryCall(CreateCall(channel), request, default(CancellationToken));
- Console.WriteLine("Received response: " + response);
+ var call = CreateUnaryEchoStringCall(channel);
+
+ Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken)));
+ Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken)));
}
- server.Shutdown();
+ server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}
- private Call<string, string> CreateCall(Channel channel)
+ private Call<string, string> CreateUnaryEchoStringCall(Channel channel)
{
- return new Call<string, string>("/tests.Test/EmptyCall",
- (s) => System.Text.Encoding.ASCII.GetBytes(s),
- (b) => System.Text.Encoding.ASCII.GetString(b),
- Timeout.InfiniteTimeSpan, channel);
+ return new Call<string, string>(unaryEchoStringMethod, channel);
+ }
+
+ private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) {
+ responseObserver.OnNext(request);
+ responseObserver.OnCompleted();
}
+
}
}
diff --git a/src/csharp/GrpcCoreTests/ServerTest.cs b/src/csharp/GrpcCoreTests/ServerTest.cs
index b34101bbf5..e6de95c336 100644
--- a/src/csharp/GrpcCoreTests/ServerTest.cs
+++ b/src/csharp/GrpcCoreTests/ServerTest.cs
@@ -12,7 +12,7 @@ namespace Google.GRPC.Core.Tests
Server server = new Server();
server.AddPort("localhost:" + Utils.PickUnusedPort());
server.Start();
- server.Shutdown();
+ server.ShutdownAsync().Wait();
GrpcEnvironment.Shutdown();
}