aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/GrpcCore/ServerCallHandler.cs
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-05 10:56:49 -0800
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-02-05 11:10:06 -0800
commit8ce5e8bbccc4b2d0e7e3b26fe857c105ba68943e (patch)
tree9cb75e486ccaaa3d6047c03a21193d2ebfb26875 /src/csharp/GrpcCore/ServerCallHandler.cs
parent6b9afb153a82c921c7e80365a4e129c462c0ebad (diff)
Improved the server implementation to be able to register call handlers, also some refactoring
Diffstat (limited to 'src/csharp/GrpcCore/ServerCallHandler.cs')
-rw-r--r--src/csharp/GrpcCore/ServerCallHandler.cs93
1 files changed, 93 insertions, 0 deletions
diff --git a/src/csharp/GrpcCore/ServerCallHandler.cs b/src/csharp/GrpcCore/ServerCallHandler.cs
new file mode 100644
index 0000000000..08d527a019
--- /dev/null
+++ b/src/csharp/GrpcCore/ServerCallHandler.cs
@@ -0,0 +1,93 @@
+using System;
+using Google.GRPC.Core.Internal;
+
+namespace Google.GRPC.Core
+{
+ internal interface IServerCallHandler
+ {
+ void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
+ }
+
+ internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ {
+ readonly Method<TRequest, TResponse> method;
+ readonly UnaryRequestServerMethod<TRequest, TResponse> handler;
+
+ public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler)
+ {
+ this.method = method;
+ this.handler = handler;
+ }
+
+ public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ {
+ var asyncCall = new AsyncCall<TResponse, TRequest>(
+ (msg) => method.ResponseMarshaller.Serialize(msg),
+ (payload) => method.RequestMarshaller.Deserialize(payload));
+
+ asyncCall.InitializeServer(call);
+ asyncCall.Accept(cq);
+
+ var request = asyncCall.ReadAsync().Result;
+
+ var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
+ handler(request, responseObserver);
+
+ asyncCall.Halfclosed.Wait();
+ // TODO: wait until writing is finished
+
+ asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
+ asyncCall.Finished.Wait();
+ }
+ }
+
+ internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ {
+ readonly Method<TRequest, TResponse> method;
+ readonly StreamingRequestServerMethod<TRequest, TResponse> handler;
+
+ public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler)
+ {
+ this.method = method;
+ this.handler = handler;
+ }
+
+ public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ {
+ var asyncCall = new AsyncCall<TResponse, TRequest>(
+ (msg) => method.ResponseMarshaller.Serialize(msg),
+ (payload) => method.RequestMarshaller.Deserialize(payload));
+
+ asyncCall.InitializeServer(call);
+ asyncCall.Accept(cq);
+
+ var responseObserver = new ServerWritingObserver<TResponse, TRequest>(asyncCall);
+ var requestObserver = handler(responseObserver);
+
+ // feed the requests
+ asyncCall.StartReadingToStream(requestObserver);
+
+ asyncCall.Halfclosed.Wait();
+
+ asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_OK, "")).Wait();
+ asyncCall.Finished.Wait();
+ }
+ }
+
+ internal class NoSuchMethodCallHandler : IServerCallHandler
+ {
+ public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ {
+ // We don't care about the payload type here.
+ AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>(
+ (payload) => payload, (payload) => payload);
+
+ asyncCall.InitializeServer(call);
+ asyncCall.Accept(cq);
+ asyncCall.WriteStatusAsync(new Status(StatusCode.GRPC_STATUS_UNIMPLEMENTED, "No such method.")).Wait();
+
+ asyncCall.Finished.Wait();
+ }
+ }
+}
+