From 8ce5e8bbccc4b2d0e7e3b26fe857c105ba68943e Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 5 Feb 2015 10:56:49 -0800 Subject: Improved the server implementation to be able to register call handlers, also some refactoring --- src/csharp/GrpcCore/Call.cs | 18 ++--- src/csharp/GrpcCore/GrpcCore.csproj | 5 ++ src/csharp/GrpcCore/IMarshaller.cs | 31 ++++++++ src/csharp/GrpcCore/Internal/AsyncCall.cs | 8 ++ src/csharp/GrpcCore/Internal/ServerSafeHandle.cs | 9 ++- .../GrpcCore/Internal/ServerWritingObserver.cs | 34 ++++++++ .../GrpcCore/Internal/StreamingInputObserver.cs | 2 +- src/csharp/GrpcCore/Method.cs | 64 +++++++++++++++ src/csharp/GrpcCore/Server.cs | 89 ++++++++++++++------- src/csharp/GrpcCore/ServerCallHandler.cs | 93 ++++++++++++++++++++++ src/csharp/GrpcCore/ServerCalls.cs | 25 ++++++ src/csharp/GrpcCoreTests/ClientServerTest.cs | 39 +++++---- src/csharp/GrpcCoreTests/ServerTest.cs | 2 +- 13 files changed, 361 insertions(+), 58 deletions(-) create mode 100644 src/csharp/GrpcCore/IMarshaller.cs create mode 100644 src/csharp/GrpcCore/Internal/ServerWritingObserver.cs create mode 100644 src/csharp/GrpcCore/Method.cs create mode 100644 src/csharp/GrpcCore/ServerCallHandler.cs create mode 100644 src/csharp/GrpcCore/ServerCalls.cs diff --git a/src/csharp/GrpcCore/Call.cs b/src/csharp/GrpcCore/Call.cs index bf257e5d59..735d1c544f 100644 --- a/src/csharp/GrpcCore/Call.cs +++ b/src/csharp/GrpcCore/Call.cs @@ -8,7 +8,6 @@ namespace Google.GRPC.Core readonly string methodName; readonly Func requestSerializer; readonly Func responseDeserializer; - readonly TimeSpan timeout; readonly Channel channel; // TODO: channel param should be removed in the future. @@ -20,24 +19,23 @@ namespace Google.GRPC.Core this.methodName = methodName; this.requestSerializer = requestSerializer; this.responseDeserializer = responseDeserializer; - this.timeout = timeout; this.channel = channel; } - - public Channel Channel + // TODO: channel param should be removed in the future. + public Call(Method method, Channel channel) { - get - { - return this.channel; - } + this.methodName = method.Name; + this.requestSerializer = method.RequestMarshaller.Serialize; + this.responseDeserializer = method.ResponseMarshaller.Deserialize; + this.channel = channel; } - public TimeSpan Timeout + public Channel Channel { get { - return this.timeout; + return this.channel; } } diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj index f0c84e78ea..2ad0f9154c 100644 --- a/src/csharp/GrpcCore/GrpcCore.csproj +++ b/src/csharp/GrpcCore/GrpcCore.csproj @@ -54,6 +54,11 @@ + + + + + diff --git a/src/csharp/GrpcCore/IMarshaller.cs b/src/csharp/GrpcCore/IMarshaller.cs new file mode 100644 index 0000000000..eb08d8d386 --- /dev/null +++ b/src/csharp/GrpcCore/IMarshaller.cs @@ -0,0 +1,31 @@ +using System; + +namespace Google.GRPC.Core +{ + /// + /// For serializing and deserializing messages. + /// + public interface IMarshaller + { + byte[] Serialize(T value); + + T Deserialize(byte[] payload); + } + + /// + /// UTF-8 Marshalling for string. Useful for testing. + /// + internal class StringMarshaller : IMarshaller { + + public byte[] Serialize(string value) + { + return System.Text.Encoding.UTF8.GetBytes(value); + } + + public string Deserialize(byte[] payload) + { + return System.Text.Encoding.UTF8.GetString(payload); + } + } +} + diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs index e83ca0eaa9..c38363bb2b 100644 --- a/src/csharp/GrpcCore/Internal/AsyncCall.cs +++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs @@ -86,6 +86,14 @@ namespace Google.GRPC.Core.Internal return StartRead().Task; } + public Task Halfclosed + { + get + { + return halfcloseTcs.Task; + } + } + public Task Finished { get diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs index 0d38bce63e..08d4cf0192 100644 --- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs @@ -30,8 +30,8 @@ namespace Google.GRPC.Core.Internal [DllImport("libgrpc.so")] static extern void grpc_server_shutdown(ServerSafeHandle server); - [DllImport("libgrpc.so")] - static extern void grpc_server_shutdown_and_notify(ServerSafeHandle server, IntPtr tag); + [DllImport("libgrpc.so", EntryPoint = "grpc_server_shutdown_and_notify")] + static extern void grpc_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); [DllImport("libgrpc.so")] static extern void grpc_server_destroy(IntPtr server); @@ -62,6 +62,11 @@ namespace Google.GRPC.Core.Internal grpc_server_shutdown(this); } + public void ShutdownAndNotify(EventCallbackDelegate callback) + { + grpc_server_shutdown_and_notify_CALLBACK(this, callback); + } + public GRPCCallError RequestCall(EventCallbackDelegate callback) { return grpc_server_request_call_old_CALLBACK(this, callback); diff --git a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs new file mode 100644 index 0000000000..e18eb9e9f1 --- /dev/null +++ b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs @@ -0,0 +1,34 @@ +using System; +using Google.GRPC.Core.Internal; + +namespace Google.GRPC.Core.Internal +{ + internal class ServerWritingObserver : IObserver + { + readonly AsyncCall call; + + public ServerWritingObserver(AsyncCall call) + { + this.call = call; + } + + public void OnCompleted() + { + // TODO: how bad is the Wait here? + call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + } + + public void OnError(Exception error) + { + // TODO: handle this... + throw new InvalidOperationException("This should never be called."); + } + + public void OnNext(TWrite value) + { + // TODO: how bad is the Wait here? + call.WriteAsync(value).Wait(); + } + } +} + diff --git a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs index d483e53a2d..c5de979351 100644 --- a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs +++ b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs @@ -1,7 +1,7 @@ using System; using Google.GRPC.Core.Internal; -namespace Google.GRPC.Core +namespace Google.GRPC.Core.Internal { internal class StreamingInputObserver : IObserver { diff --git a/src/csharp/GrpcCore/Method.cs b/src/csharp/GrpcCore/Method.cs new file mode 100644 index 0000000000..2790115695 --- /dev/null +++ b/src/csharp/GrpcCore/Method.cs @@ -0,0 +1,64 @@ +using System; + +namespace Google.GRPC.Core +{ + public enum MethodType + { + Unary, + ClientStreaming, + ServerStreaming, + DuplexStreaming + } + + /// + /// A description of a service method. + /// + public class Method + { + readonly MethodType type; + readonly string name; + readonly IMarshaller requestMarshaller; + readonly IMarshaller responseMarshaller; + + public Method(MethodType type, string name, IMarshaller requestMarshaller, IMarshaller responseMarshaller) + { + this.type = type; + this.name = name; + this.requestMarshaller = requestMarshaller; + this.responseMarshaller = responseMarshaller; + } + + public MethodType Type + { + get + { + return this.type; + } + } + + public string Name + { + get + { + return this.name; + } + } + + public IMarshaller RequestMarshaller + { + get + { + return this.requestMarshaller; + } + } + + public IMarshaller ResponseMarshaller + { + get + { + return this.responseMarshaller; + } + } + } +} + diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs index 68da1a8300..4e9d114f85 100644 --- a/src/csharp/GrpcCore/Server.cs +++ b/src/csharp/GrpcCore/Server.cs @@ -1,7 +1,9 @@ using System; using System.Runtime.InteropServices; using System.Diagnostics; +using System.Threading.Tasks; using System.Collections.Concurrent; +using System.Collections.Generic; using Google.GRPC.Core.Internal; namespace Google.GRPC.Core @@ -15,10 +17,15 @@ namespace Google.GRPC.Core // TODO: make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. readonly EventCallbackDelegate newRpcHandler; + readonly EventCallbackDelegate serverShutdownHandler; readonly BlockingCollection newRpcQueue = new BlockingCollection(); readonly ServerSafeHandle handle; + readonly Dictionary callHandlers = new Dictionary(); + + readonly TaskCompletionSource shutdownTcs = new TaskCompletionSource(); + static Server() { GrpcEnvironment.EnsureInitialized(); } @@ -28,8 +35,14 @@ namespace Google.GRPC.Core // TODO: what is the tag for server shutdown? this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); this.newRpcHandler = HandleNewRpc; + this.serverShutdownHandler = HandleServerShutdown; } + // only call before Start(), this will be in server builder in the future. + internal void AddCallHandler(string methodName, IServerCallHandler handler) { + callHandlers.Add(methodName, handler); + } + // only call before Start() public int AddPort(string addr) { return handle.AddPort(addr); } @@ -37,49 +50,57 @@ namespace Google.GRPC.Core public void Start() { handle.Start(); + + // TODO: this basically means the server is single threaded.... + StartHandlingRpcs(); } - public void RunRpc() + /// + /// Requests and handles single RPC call. + /// + internal void RunRpc() { AllowOneRpc(); - try { - var rpcInfo = newRpcQueue.Take(); - - Console.WriteLine("Server received RPC " + rpcInfo.Method); - - AsyncCall asyncCall = new AsyncCall( - (payload) => payload, (payload) => payload); - - asyncCall.InitializeServer(rpcInfo.Call); + try + { + var rpcInfo = newRpcQueue.Take(); - asyncCall.Accept(GetCompletionQueue()); + Console.WriteLine("Server received RPC " + rpcInfo.Method); - while(true) { - byte[] payload = asyncCall.ReadAsync().Result; - if (payload == null) + IServerCallHandler callHandler; + if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) { - break; - } + callHandler = new NoSuchMethodCallHandler(); + } + callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue()); } - - asyncCall.WriteAsync(new byte[] { }).Wait(); - - // TODO: what should be the details? - asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); - - asyncCall.Finished.Wait(); - } catch(Exception e) { + catch(Exception e) + { Console.WriteLine("Exception while handling RPC: " + e); } } - // TODO: implement disposal properly... - public void Shutdown() { - handle.Shutdown(); + /// + /// Requests server shutdown and when there are no more calls being serviced, + /// cleans up used resources. + /// + /// The async. + public async Task ShutdownAsync() { + handle.ShutdownAndNotify(serverShutdownHandler); + await shutdownTcs.Task; + handle.Dispose(); + } + public void Kill() { + handle.Dispose(); + } - //handle.Dispose(); + private async Task StartHandlingRpcs() { + while (true) + { + await Task.Factory.StartNew(RunRpc); + } } private void AllowOneRpc() @@ -100,6 +121,18 @@ namespace Google.GRPC.Core } } + private void HandleServerShutdown(IntPtr eventPtr) + { + try + { + shutdownTcs.SetResult(null); + } + catch (Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + private static void AssertCallOk(GRPCCallError callError) { Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs new file mode 100644 index 0000000000..08d527a019 --- /dev/null +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -0,0 +1,93 @@ +using System; +using Google.GRPC.Core.Internal; + +namespace Google.GRPC.Core +{ + internal interface IServerCallHandler + { + void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq); + } + + internal class UnaryRequestServerCallHandler : IServerCallHandler + { + readonly Method method; + readonly UnaryRequestServerMethod handler; + + public UnaryRequestServerCallHandler(Method method, UnaryRequestServerMethod handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall( + (msg) => method.ResponseMarshaller.Serialize(msg), + (payload) => method.RequestMarshaller.Deserialize(payload)); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + + var request = asyncCall.ReadAsync().Result; + + var responseObserver = new ServerWritingObserver(asyncCall); + handler(request, responseObserver); + + asyncCall.Halfclosed.Wait(); + // TODO: wait until writing is finished + + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + asyncCall.Finished.Wait(); + } + } + + internal class StreamingRequestServerCallHandler : IServerCallHandler + { + readonly Method method; + readonly StreamingRequestServerMethod handler; + + public StreamingRequestServerCallHandler(Method method, StreamingRequestServerMethod handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall( + (msg) => method.ResponseMarshaller.Serialize(msg), + (payload) => method.RequestMarshaller.Deserialize(payload)); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + + var responseObserver = new ServerWritingObserver(asyncCall); + var requestObserver = handler(responseObserver); + + // feed the requests + asyncCall.StartReadingToStream(requestObserver); + + asyncCall.Halfclosed.Wait(); + + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + asyncCall.Finished.Wait(); + } + } + + internal class NoSuchMethodCallHandler : IServerCallHandler + { + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + // We don't care about the payload type here. + AsyncCall asyncCall = new AsyncCall( + (payload) => payload, (payload) => payload); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); + + asyncCall.Finished.Wait(); + } + } +} + diff --git a/src/csharp/GrpcCore/ServerCalls.cs b/src/csharp/GrpcCore/ServerCalls.cs new file mode 100644 index 0000000000..86c4718932 --- /dev/null +++ b/src/csharp/GrpcCore/ServerCalls.cs @@ -0,0 +1,25 @@ +using System; + +namespace Google.GRPC.Core +{ + // TODO: perhaps add also serverSideStreaming and clientSideStreaming + + public delegate void UnaryRequestServerMethod (TRequest request, IObserver responseObserver); + + public delegate IObserver StreamingRequestServerMethod (IObserver responseObserver); + + internal static class ServerCalls { + + public static IServerCallHandler UnaryRequestCall(Method method, UnaryRequestServerMethod handler) + { + return new UnaryRequestServerCallHandler(method, handler); + } + + public static IServerCallHandler StreamingRequestCall(Method method, StreamingRequestServerMethod handler) + { + return new StreamingRequestServerCallHandler(method, handler); + } + + } +} + diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index 823ee94288..511683b003 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -8,41 +8,48 @@ namespace Google.GRPC.Core.Tests { public class ClientServerTest { - string request = "REQUEST"; string serverAddr = "localhost:" + Utils.PickUnusedPort(); + private Method unaryEchoStringMethod = new Method( + MethodType.Unary, + "/tests.Test/UnaryEchoString", + new StringMarshaller(), + new StringMarshaller()); + [Test] public void EmptyCall() { Server server = new Server(); + + server.AddCallHandler(unaryEchoStringMethod.Name, + ServerCalls.UnaryRequestCall(unaryEchoStringMethod, HandleUnaryEchoString)); + server.AddPort(serverAddr); server.Start(); - Task.Factory.StartNew( - () => { - server.RunRpc(); - } - ); - using (Channel channel = new Channel(serverAddr)) { - CreateCall(channel); - string response = Calls.BlockingUnaryCall(CreateCall(channel), request, default(CancellationToken)); - Console.WriteLine("Received response: " + response); + var call = CreateUnaryEchoStringCall(channel); + + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken))); + Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken))); } - server.Shutdown(); + server.ShutdownAsync().Wait(); GrpcEnvironment.Shutdown(); } - private Call CreateCall(Channel channel) + private Call CreateUnaryEchoStringCall(Channel channel) { - return new Call("/tests.Test/EmptyCall", - (s) => System.Text.Encoding.ASCII.GetBytes(s), - (b) => System.Text.Encoding.ASCII.GetString(b), - Timeout.InfiniteTimeSpan, channel); + return new Call(unaryEchoStringMethod, channel); + } + + private void HandleUnaryEchoString(string request, IObserver responseObserver) { + responseObserver.OnNext(request); + responseObserver.OnCompleted(); } + } } diff --git a/src/csharp/GrpcCoreTests/ServerTest.cs b/src/csharp/GrpcCoreTests/ServerTest.cs index b34101bbf5..e6de95c336 100644 --- a/src/csharp/GrpcCoreTests/ServerTest.cs +++ b/src/csharp/GrpcCoreTests/ServerTest.cs @@ -12,7 +12,7 @@ namespace Google.GRPC.Core.Tests Server server = new Server(); server.AddPort("localhost:" + Utils.PickUnusedPort()); server.Start(); - server.Shutdown(); + server.ShutdownAsync().Wait(); GrpcEnvironment.Shutdown(); } -- cgit v1.2.3