diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2015-02-05 10:56:49 -0800 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2015-02-05 11:10:06 -0800 |
commit | 8ce5e8bbccc4b2d0e7e3b26fe857c105ba68943e (patch) | |
tree | 9cb75e486ccaaa3d6047c03a21193d2ebfb26875 | |
parent | 6b9afb153a82c921c7e80365a4e129c462c0ebad (diff) |
Improved the server implementation to be able to register call handlers, also some refactoring
-rw-r--r-- | src/csharp/GrpcCore/Call.cs | 18 | ||||
-rw-r--r-- | src/csharp/GrpcCore/GrpcCore.csproj | 5 | ||||
-rw-r--r-- | src/csharp/GrpcCore/IMarshaller.cs | 31 | ||||
-rw-r--r-- | src/csharp/GrpcCore/Internal/AsyncCall.cs | 8 | ||||
-rw-r--r-- | src/csharp/GrpcCore/Internal/ServerSafeHandle.cs | 9 | ||||
-rw-r--r-- | src/csharp/GrpcCore/Internal/ServerWritingObserver.cs | 34 | ||||
-rw-r--r-- | src/csharp/GrpcCore/Internal/StreamingInputObserver.cs | 2 | ||||
-rw-r--r-- | src/csharp/GrpcCore/Method.cs | 64 | ||||
-rw-r--r-- | src/csharp/GrpcCore/Server.cs | 89 | ||||
-rw-r--r-- | src/csharp/GrpcCore/ServerCallHandler.cs | 93 | ||||
-rw-r--r-- | src/csharp/GrpcCore/ServerCalls.cs | 25 | ||||
-rw-r--r-- | src/csharp/GrpcCoreTests/ClientServerTest.cs | 39 | ||||
-rw-r--r-- | src/csharp/GrpcCoreTests/ServerTest.cs | 2 |
13 files changed, 361 insertions, 58 deletions
diff --git a/src/csharp/GrpcCore/Call.cs b/src/csharp/GrpcCore/Call.cs index bf257e5d59..735d1c544f 100644 --- a/src/csharp/GrpcCore/Call.cs +++ b/src/csharp/GrpcCore/Call.cs @@ -8,7 +8,6 @@ namespace Google.GRPC.Core readonly string methodName; readonly Func<TRequest, byte[]> requestSerializer; readonly Func<byte[], TResponse> responseDeserializer; - readonly TimeSpan timeout; readonly Channel channel; // TODO: channel param should be removed in the future. @@ -20,24 +19,23 @@ namespace Google.GRPC.Core this.methodName = methodName; this.requestSerializer = requestSerializer; this.responseDeserializer = responseDeserializer; - this.timeout = timeout; this.channel = channel; } - - public Channel Channel + // TODO: channel param should be removed in the future. + public Call(Method<TRequest, TResponse> method, Channel channel) { - get - { - return this.channel; - } + this.methodName = method.Name; + this.requestSerializer = method.RequestMarshaller.Serialize; + this.responseDeserializer = method.ResponseMarshaller.Deserialize; + this.channel = channel; } - public TimeSpan Timeout + public Channel Channel { get { - return this.timeout; + return this.channel; } } diff --git a/src/csharp/GrpcCore/GrpcCore.csproj b/src/csharp/GrpcCore/GrpcCore.csproj index f0c84e78ea..2ad0f9154c 100644 --- a/src/csharp/GrpcCore/GrpcCore.csproj +++ b/src/csharp/GrpcCore/GrpcCore.csproj @@ -54,6 +54,11 @@ <Compile Include="Internal\AsyncCall.cs" /> <Compile Include="Internal\ServerSafeHandle.cs" /> <Compile Include="Internal\StreamingInputObserver.cs" /> + <Compile Include="Method.cs" /> + <Compile Include="IMarshaller.cs" /> + <Compile Include="ServerCalls.cs" /> + <Compile Include="ServerCallHandler.cs" /> + <Compile Include="Internal\ServerWritingObserver.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/GrpcCore/IMarshaller.cs b/src/csharp/GrpcCore/IMarshaller.cs new file mode 100644 index 0000000000..eb08d8d386 --- /dev/null +++ b/src/csharp/GrpcCore/IMarshaller.cs @@ -0,0 +1,31 @@ +using System; + +namespace Google.GRPC.Core +{ + /// <summary> + /// For serializing and deserializing messages. + /// </summary> + public interface IMarshaller<T> + { + byte[] Serialize(T value); + + T Deserialize(byte[] payload); + } + + /// <summary> + /// UTF-8 Marshalling for string. Useful for testing. + /// </summary> + internal class StringMarshaller : IMarshaller<string> { + + public byte[] Serialize(string value) + { + return System.Text.Encoding.UTF8.GetBytes(value); + } + + public string Deserialize(byte[] payload) + { + return System.Text.Encoding.UTF8.GetString(payload); + } + } +} + diff --git a/src/csharp/GrpcCore/Internal/AsyncCall.cs b/src/csharp/GrpcCore/Internal/AsyncCall.cs index e83ca0eaa9..c38363bb2b 100644 --- a/src/csharp/GrpcCore/Internal/AsyncCall.cs +++ b/src/csharp/GrpcCore/Internal/AsyncCall.cs @@ -86,6 +86,14 @@ namespace Google.GRPC.Core.Internal return StartRead().Task; } + public Task Halfclosed + { + get + { + return halfcloseTcs.Task; + } + } + public Task<Status> Finished { get diff --git a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs index 0d38bce63e..08d4cf0192 100644 --- a/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs +++ b/src/csharp/GrpcCore/Internal/ServerSafeHandle.cs @@ -30,8 +30,8 @@ namespace Google.GRPC.Core.Internal [DllImport("libgrpc.so")] static extern void grpc_server_shutdown(ServerSafeHandle server); - [DllImport("libgrpc.so")] - static extern void grpc_server_shutdown_and_notify(ServerSafeHandle server, IntPtr tag); + [DllImport("libgrpc.so", EntryPoint = "grpc_server_shutdown_and_notify")] + static extern void grpc_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] EventCallbackDelegate callback); [DllImport("libgrpc.so")] static extern void grpc_server_destroy(IntPtr server); @@ -62,6 +62,11 @@ namespace Google.GRPC.Core.Internal grpc_server_shutdown(this); } + public void ShutdownAndNotify(EventCallbackDelegate callback) + { + grpc_server_shutdown_and_notify_CALLBACK(this, callback); + } + public GRPCCallError RequestCall(EventCallbackDelegate callback) { return grpc_server_request_call_old_CALLBACK(this, callback); diff --git a/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs new file mode 100644 index 0000000000..e18eb9e9f1 --- /dev/null +++ b/src/csharp/GrpcCore/Internal/ServerWritingObserver.cs @@ -0,0 +1,34 @@ +using System; +using Google.GRPC.Core.Internal; + +namespace Google.GRPC.Core.Internal +{ + internal class ServerWritingObserver<TWrite, TRead> : IObserver<TWrite> + { + readonly AsyncCall<TWrite, TRead> call; + + public ServerWritingObserver(AsyncCall<TWrite, TRead> call) + { + this.call = call; + } + + public void OnCompleted() + { + // TODO: how bad is the Wait here? + call.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + } + + public void OnError(Exception error) + { + // TODO: handle this... + throw new InvalidOperationException("This should never be called."); + } + + public void OnNext(TWrite value) + { + // TODO: how bad is the Wait here? + call.WriteAsync(value).Wait(); + } + } +} + diff --git a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs index d483e53a2d..c5de979351 100644 --- a/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs +++ b/src/csharp/GrpcCore/Internal/StreamingInputObserver.cs @@ -1,7 +1,7 @@ using System; using Google.GRPC.Core.Internal; -namespace Google.GRPC.Core +namespace Google.GRPC.Core.Internal { internal class StreamingInputObserver<TWrite, TRead> : IObserver<TWrite> { diff --git a/src/csharp/GrpcCore/Method.cs b/src/csharp/GrpcCore/Method.cs new file mode 100644 index 0000000000..2790115695 --- /dev/null +++ b/src/csharp/GrpcCore/Method.cs @@ -0,0 +1,64 @@ +using System; + +namespace Google.GRPC.Core +{ + public enum MethodType + { + Unary, + ClientStreaming, + ServerStreaming, + DuplexStreaming + } + + /// <summary> + /// A description of a service method. + /// </summary> + public class Method<TRequest, TResponse> + { + readonly MethodType type; + readonly string name; + readonly IMarshaller<TRequest> requestMarshaller; + readonly IMarshaller<TResponse> responseMarshaller; + + public Method(MethodType type, string name, IMarshaller<TRequest> requestMarshaller, IMarshaller<TResponse> responseMarshaller) + { + this.type = type; + this.name = name; + this.requestMarshaller = requestMarshaller; + this.responseMarshaller = responseMarshaller; + } + + public MethodType Type + { + get + { + return this.type; + } + } + + public string Name + { + get + { + return this.name; + } + } + + public IMarshaller<TRequest> RequestMarshaller + { + get + { + return this.requestMarshaller; + } + } + + public IMarshaller<TResponse> ResponseMarshaller + { + get + { + return this.responseMarshaller; + } + } + } +} + diff --git a/src/csharp/GrpcCore/Server.cs b/src/csharp/GrpcCore/Server.cs index 68da1a8300..4e9d114f85 100644 --- a/src/csharp/GrpcCore/Server.cs +++ b/src/csharp/GrpcCore/Server.cs @@ -1,7 +1,9 @@ using System; using System.Runtime.InteropServices; using System.Diagnostics; +using System.Threading.Tasks; using System.Collections.Concurrent; +using System.Collections.Generic; using Google.GRPC.Core.Internal; namespace Google.GRPC.Core @@ -15,10 +17,15 @@ namespace Google.GRPC.Core // TODO: make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. readonly EventCallbackDelegate newRpcHandler; + readonly EventCallbackDelegate serverShutdownHandler; readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>(); readonly ServerSafeHandle handle; + readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); + + readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); + static Server() { GrpcEnvironment.EnsureInitialized(); } @@ -28,8 +35,14 @@ namespace Google.GRPC.Core // TODO: what is the tag for server shutdown? this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); this.newRpcHandler = HandleNewRpc; + this.serverShutdownHandler = HandleServerShutdown; } + // only call before Start(), this will be in server builder in the future. + internal void AddCallHandler(string methodName, IServerCallHandler handler) { + callHandlers.Add(methodName, handler); + } + // only call before Start() public int AddPort(string addr) { return handle.AddPort(addr); } @@ -37,49 +50,57 @@ namespace Google.GRPC.Core public void Start() { handle.Start(); + + // TODO: this basically means the server is single threaded.... + StartHandlingRpcs(); } - public void RunRpc() + /// <summary> + /// Requests and handles single RPC call. + /// </summary> + internal void RunRpc() { AllowOneRpc(); - try { - var rpcInfo = newRpcQueue.Take(); - - Console.WriteLine("Server received RPC " + rpcInfo.Method); - - AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( - (payload) => payload, (payload) => payload); - - asyncCall.InitializeServer(rpcInfo.Call); + try + { + var rpcInfo = newRpcQueue.Take(); - asyncCall.Accept(GetCompletionQueue()); + Console.WriteLine("Server received RPC " + rpcInfo.Method); - while(true) { - byte[] payload = asyncCall.ReadAsync().Result; - if (payload == null) + IServerCallHandler callHandler; + if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) { - break; - } + callHandler = new NoSuchMethodCallHandler(); + } + callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue()); } - - asyncCall.WriteAsync(new byte[] { }).Wait(); - - // TODO: what should be the details? - asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); - - asyncCall.Finished.Wait(); - } catch(Exception e) { + catch(Exception e) + { Console.WriteLine("Exception while handling RPC: " + e); } } - // TODO: implement disposal properly... - public void Shutdown() { - handle.Shutdown(); + /// <summary> + /// Requests server shutdown and when there are no more calls being serviced, + /// cleans up used resources. + /// </summary> + /// <returns>The async.</returns> + public async Task ShutdownAsync() { + handle.ShutdownAndNotify(serverShutdownHandler); + await shutdownTcs.Task; + handle.Dispose(); + } + public void Kill() { + handle.Dispose(); + } - //handle.Dispose(); + private async Task StartHandlingRpcs() { + while (true) + { + await Task.Factory.StartNew(RunRpc); + } } private void AllowOneRpc() @@ -100,6 +121,18 @@ namespace Google.GRPC.Core } } + private void HandleServerShutdown(IntPtr eventPtr) + { + try + { + shutdownTcs.SetResult(null); + } + catch (Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + private static void AssertCallOk(GRPCCallError callError) { Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs new file mode 100644 index 0000000000..08d527a019 --- /dev/null +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -0,0 +1,93 @@ +using System; +using Google.GRPC.Core.Internal; + +namespace Google.GRPC.Core +{ + internal interface IServerCallHandler + { + void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq); + } + + internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + { + readonly Method<TRequest, TResponse> method; + readonly UnaryRequestServerMethod<TRequest, TResponse> handler; + + public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall<TResponse, TRequest>( + (msg) => method.ResponseMarshaller.Serialize(msg), + (payload) => method.RequestMarshaller.Deserialize(payload)); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + + var request = asyncCall.ReadAsync().Result; + + var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + handler(request, responseObserver); + + asyncCall.Halfclosed.Wait(); + // TODO: wait until writing is finished + + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + asyncCall.Finished.Wait(); + } + } + + internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + { + readonly Method<TRequest, TResponse> method; + readonly StreamingRequestServerMethod<TRequest, TResponse> handler; + + public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall<TResponse, TRequest>( + (msg) => method.ResponseMarshaller.Serialize(msg), + (payload) => method.RequestMarshaller.Deserialize(payload)); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + + var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + var requestObserver = handler(responseObserver); + + // feed the requests + asyncCall.StartReadingToStream(requestObserver); + + asyncCall.Halfclosed.Wait(); + + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + asyncCall.Finished.Wait(); + } + } + + internal class NoSuchMethodCallHandler : IServerCallHandler + { + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + // We don't care about the payload type here. + AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( + (payload) => payload, (payload) => payload); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); + + asyncCall.Finished.Wait(); + } + } +} + diff --git a/src/csharp/GrpcCore/ServerCalls.cs b/src/csharp/GrpcCore/ServerCalls.cs new file mode 100644 index 0000000000..86c4718932 --- /dev/null +++ b/src/csharp/GrpcCore/ServerCalls.cs @@ -0,0 +1,25 @@ +using System; + +namespace Google.GRPC.Core +{ + // TODO: perhaps add also serverSideStreaming and clientSideStreaming + + public delegate void UnaryRequestServerMethod<TRequest, TResponse> (TRequest request, IObserver<TResponse> responseObserver); + + public delegate IObserver<TRequest> StreamingRequestServerMethod<TRequest, TResponse> (IObserver<TResponse> responseObserver); + + internal static class ServerCalls { + + public static IServerCallHandler UnaryRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler) + { + return new UnaryRequestServerCallHandler<TRequest, TResponse>(method, handler); + } + + public static IServerCallHandler StreamingRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler) + { + return new StreamingRequestServerCallHandler<TRequest, TResponse>(method, handler); + } + + } +} + diff --git a/src/csharp/GrpcCoreTests/ClientServerTest.cs b/src/csharp/GrpcCoreTests/ClientServerTest.cs index 823ee94288..511683b003 100644 --- a/src/csharp/GrpcCoreTests/ClientServerTest.cs +++ b/src/csharp/GrpcCoreTests/ClientServerTest.cs @@ -8,41 +8,48 @@ namespace Google.GRPC.Core.Tests { public class ClientServerTest { - string request = "REQUEST"; string serverAddr = "localhost:" + Utils.PickUnusedPort(); + private Method<string, string> unaryEchoStringMethod = new Method<string, string>( + MethodType.Unary, + "/tests.Test/UnaryEchoString", + new StringMarshaller(), + new StringMarshaller()); + [Test] public void EmptyCall() { Server server = new Server(); + + server.AddCallHandler(unaryEchoStringMethod.Name, + ServerCalls.UnaryRequestCall(unaryEchoStringMethod, HandleUnaryEchoString)); + server.AddPort(serverAddr); server.Start(); - Task.Factory.StartNew( - () => { - server.RunRpc(); - } - ); - using (Channel channel = new Channel(serverAddr)) { - CreateCall(channel); - string response = Calls.BlockingUnaryCall(CreateCall(channel), request, default(CancellationToken)); - Console.WriteLine("Received response: " + response); + var call = CreateUnaryEchoStringCall(channel); + + Assert.AreEqual("ABC", Calls.BlockingUnaryCall(call, "ABC", default(CancellationToken))); + Assert.AreEqual("abcdef", Calls.BlockingUnaryCall(call, "abcdef", default(CancellationToken))); } - server.Shutdown(); + server.ShutdownAsync().Wait(); GrpcEnvironment.Shutdown(); } - private Call<string, string> CreateCall(Channel channel) + private Call<string, string> CreateUnaryEchoStringCall(Channel channel) { - return new Call<string, string>("/tests.Test/EmptyCall", - (s) => System.Text.Encoding.ASCII.GetBytes(s), - (b) => System.Text.Encoding.ASCII.GetString(b), - Timeout.InfiniteTimeSpan, channel); + return new Call<string, string>(unaryEchoStringMethod, channel); + } + + private void HandleUnaryEchoString(string request, IObserver<string> responseObserver) { + responseObserver.OnNext(request); + responseObserver.OnCompleted(); } + } } diff --git a/src/csharp/GrpcCoreTests/ServerTest.cs b/src/csharp/GrpcCoreTests/ServerTest.cs index b34101bbf5..e6de95c336 100644 --- a/src/csharp/GrpcCoreTests/ServerTest.cs +++ b/src/csharp/GrpcCoreTests/ServerTest.cs @@ -12,7 +12,7 @@ namespace Google.GRPC.Core.Tests Server server = new Server(); server.AddPort("localhost:" + Utils.PickUnusedPort()); server.Start(); - server.Shutdown(); + server.ShutdownAsync().Wait(); GrpcEnvironment.Shutdown(); } |