diff options
author | 2015-02-05 10:56:49 -0800 | |
---|---|---|
committer | 2015-02-05 11:10:06 -0800 | |
commit | 8ce5e8bbccc4b2d0e7e3b26fe857c105ba68943e (patch) | |
tree | 9cb75e486ccaaa3d6047c03a21193d2ebfb26875 /src/csharp/GrpcCore/ServerCallHandler.cs | |
parent | 6b9afb153a82c921c7e80365a4e129c462c0ebad (diff) |
Improved the server implementation to be able to register call handlers, also some refactoring
Diffstat (limited to 'src/csharp/GrpcCore/ServerCallHandler.cs')
-rw-r--r-- | src/csharp/GrpcCore/ServerCallHandler.cs | 93 |
1 files changed, 93 insertions, 0 deletions
diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs new file mode 100644 index 0000000000..08d527a019 --- /dev/null +++ b/src/csharp/GrpcCore/ServerCallHandler.cs @@ -0,0 +1,93 @@ +using System; +using Google.GRPC.Core.Internal; + +namespace Google.GRPC.Core +{ + internal interface IServerCallHandler + { + void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq); + } + + internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + { + readonly Method<TRequest, TResponse> method; + readonly UnaryRequestServerMethod<TRequest, TResponse> handler; + + public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall<TResponse, TRequest>( + (msg) => method.ResponseMarshaller.Serialize(msg), + (payload) => method.RequestMarshaller.Deserialize(payload)); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + + var request = asyncCall.ReadAsync().Result; + + var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + handler(request, responseObserver); + + asyncCall.Halfclosed.Wait(); + // TODO: wait until writing is finished + + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + asyncCall.Finished.Wait(); + } + } + + internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + { + readonly Method<TRequest, TResponse> method; + readonly StreamingRequestServerMethod<TRequest, TResponse> handler; + + public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall<TResponse, TRequest>( + (msg) => method.ResponseMarshaller.Serialize(msg), + (payload) => method.RequestMarshaller.Deserialize(payload)); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + + var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall); + var requestObserver = handler(responseObserver); + + // feed the requests + asyncCall.StartReadingToStream(requestObserver); + + asyncCall.Halfclosed.Wait(); + + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait(); + asyncCall.Finished.Wait(); + } + } + + internal class NoSuchMethodCallHandler : IServerCallHandler + { + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + // We don't care about the payload type here. + AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( + (payload) => payload, (payload) => payload); + + asyncCall.InitializeServer(call); + asyncCall.Accept(cq); + asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait(); + + asyncCall.Finished.Wait(); + } + } +} + |