diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/ServerCallHandler.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 204 |
1 files changed, 169 insertions, 35 deletions
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 25fd4fab8f..01b2a11369 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -33,6 +33,7 @@ using System; using System.Linq; +using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Utils; @@ -40,96 +41,229 @@ namespace Grpc.Core.Internal { internal interface IServerCallHandler { - void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq); + Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq); } - internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler { readonly Method<TRequest, TResponse> method; - readonly UnaryRequestServerMethod<TRequest, TResponse> handler; + readonly UnaryServerMethod<TRequest, TResponse> handler; - public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler) + public UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler) { this.method = method; this.handler = handler; } - public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer); asyncCall.Initialize(call); - - var requestObserver = new RecordingObserver<TRequest>(); - var finishedTask = asyncCall.ServerSideCallAsync(requestObserver); + var finishedTask = asyncCall.ServerSideCallAsync(); + var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); + var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); - var request = requestObserver.ToList().Result.Single(); - var responseObserver = new ServerStreamingOutputObserver<TRequest, TResponse>(asyncCall); - handler(request, responseObserver); - - finishedTask.Wait(); + Status status = Status.DefaultSuccess; + try + { + var request = await requestStream.ReadNext(); + // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. + Preconditions.CheckArgument(await requestStream.ReadNext() == null); + var result = await handler(request); + await responseStream.Write(result); + } + catch (Exception e) + { + Console.WriteLine("Exception occured in handler: " + e); + status = HandlerUtils.StatusFromException(e); + } + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } + await finishedTask; } } - internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler { readonly Method<TRequest, TResponse> method; - readonly StreamingRequestServerMethod<TRequest, TResponse> handler; + readonly ServerStreamingServerMethod<TRequest, TResponse> handler; - public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler) + public ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler) { this.method = method; this.handler = handler; } - public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer); asyncCall.Initialize(call); + var finishedTask = asyncCall.ServerSideCallAsync(); + var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); + var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); + + Status status = Status.DefaultSuccess; + try + { + var request = await requestStream.ReadNext(); + // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. + Preconditions.CheckArgument(await requestStream.ReadNext() == null); + + await handler(request, responseStream); + } + catch (Exception e) + { + Console.WriteLine("Exception occured in handler: " + e); + status = HandlerUtils.StatusFromException(e); + } - var responseObserver = new ServerStreamingOutputObserver<TRequest, TResponse>(asyncCall); - var requestObserver = handler(responseObserver); - var finishedTask = asyncCall.ServerSideCallAsync(requestObserver); - finishedTask.Wait(); + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } + await finishedTask; } } - internal class NoSuchMethodCallHandler : IServerCallHandler + internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler { - public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + readonly Method<TRequest, TResponse> method; + readonly ClientStreamingServerMethod<TRequest, TResponse> handler; + + public ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler) { - // We don't care about the payload type here. - var asyncCall = new AsyncCallServer<byte[], byte[]>( - (payload) => payload, (payload) => payload); + this.method = method; + this.handler = handler; + } - asyncCall.Initialize(call); + public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCallServer<TRequest, TResponse>( + method.ResponseMarshaller.Serializer, + method.RequestMarshaller.Deserializer); - var finishedTask = asyncCall.ServerSideCallAsync(new NullObserver<byte[]>()); + asyncCall.Initialize(call); + var finishedTask = asyncCall.ServerSideCallAsync(); + var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); + var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); - // TODO: check result of the completion status. - asyncCall.StartSendStatusFromServer(new Status(StatusCode.Unimplemented, "No such method."), new AsyncCompletionDelegate((error) => { })); + Status status = Status.DefaultSuccess; + try + { + var result = await handler(requestStream); + try + { + await responseStream.Write(result); + } + catch (OperationCanceledException) + { + status = Status.DefaultCancelled; + } + } + catch (Exception e) + { + Console.WriteLine("Exception occured in handler: " + e); + status = HandlerUtils.StatusFromException(e); + } - finishedTask.Wait(); + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } + await finishedTask; } } - internal class NullObserver<T> : IObserver<T> + internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler { - public void OnCompleted() + readonly Method<TRequest, TResponse> method; + readonly DuplexStreamingServerMethod<TRequest, TResponse> handler; + + public DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler) { + this.method = method; + this.handler = handler; + } + + public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCallServer<TRequest, TResponse>( + method.ResponseMarshaller.Serializer, + method.RequestMarshaller.Deserializer); + + asyncCall.Initialize(call); + var finishedTask = asyncCall.ServerSideCallAsync(); + var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); + var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); + + Status status = Status.DefaultSuccess; + try + { + await handler(requestStream, responseStream); + } + catch (Exception e) + { + Console.WriteLine("Exception occured in handler: " + e); + status = HandlerUtils.StatusFromException(e); + } + try + { + await responseStream.WriteStatus(status); + } + catch (OperationCanceledException) + { + // Call has been already cancelled. + } + await finishedTask; } + } - public void OnError(Exception error) + internal class NoSuchMethodCallHandler : IServerCallHandler + { + public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { + // We don't care about the payload type here. + var asyncCall = new AsyncCallServer<byte[], byte[]>( + (payload) => payload, (payload) => payload); + + asyncCall.Initialize(call); + var finishedTask = asyncCall.ServerSideCallAsync(); + var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall); + var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); + + await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method.")); + // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed. + await requestStream.ToList(); + await finishedTask; } + } - public void OnNext(T value) + internal static class HandlerUtils + { + public static Status StatusFromException(Exception e) { + // TODO(jtattermusch): what is the right status code here? + return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } } } |