aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/ServerCallHandler.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs204
1 files changed, 169 insertions, 35 deletions
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 25fd4fab8f..01b2a11369 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -33,6 +33,7 @@
using System;
using System.Linq;
+using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@@ -40,96 +41,229 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
- void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
+ Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq);
}
- internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
{
readonly Method<TRequest, TResponse> method;
- readonly UnaryRequestServerMethod<TRequest, TResponse> handler;
+ readonly UnaryServerMethod<TRequest, TResponse> handler;
- public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler)
+ public UnaryServerCallHandler(Method<TRequest, TResponse> method, UnaryServerMethod<TRequest, TResponse> handler)
{
this.method = method;
this.handler = handler;
}
- public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
asyncCall.Initialize(call);
-
- var requestObserver = new RecordingObserver<TRequest>();
- var finishedTask = asyncCall.ServerSideCallAsync(requestObserver);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
+ var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
- var request = requestObserver.ToList().Result.Single();
- var responseObserver = new ServerStreamingOutputObserver<TRequest, TResponse>(asyncCall);
- handler(request, responseObserver);
-
- finishedTask.Wait();
+ Status status = Status.DefaultSuccess;
+ try
+ {
+ var request = await requestStream.ReadNext();
+ // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
+ Preconditions.CheckArgument(await requestStream.ReadNext() == null);
+ var result = await handler(request);
+ await responseStream.Write(result);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured in handler: " + e);
+ status = HandlerUtils.StatusFromException(e);
+ }
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
+ await finishedTask;
}
}
- internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler
+ internal class ServerStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
{
readonly Method<TRequest, TResponse> method;
- readonly StreamingRequestServerMethod<TRequest, TResponse> handler;
+ readonly ServerStreamingServerMethod<TRequest, TResponse> handler;
- public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler)
+ public ServerStreamingServerCallHandler(Method<TRequest, TResponse> method, ServerStreamingServerMethod<TRequest, TResponse> handler)
{
this.method = method;
this.handler = handler;
}
- public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer);
asyncCall.Initialize(call);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
+ var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
+
+ Status status = Status.DefaultSuccess;
+ try
+ {
+ var request = await requestStream.ReadNext();
+ // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
+ Preconditions.CheckArgument(await requestStream.ReadNext() == null);
+
+ await handler(request, responseStream);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured in handler: " + e);
+ status = HandlerUtils.StatusFromException(e);
+ }
- var responseObserver = new ServerStreamingOutputObserver<TRequest, TResponse>(asyncCall);
- var requestObserver = handler(responseObserver);
- var finishedTask = asyncCall.ServerSideCallAsync(requestObserver);
- finishedTask.Wait();
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
+ await finishedTask;
}
}
- internal class NoSuchMethodCallHandler : IServerCallHandler
+ internal class ClientStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
{
- public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ readonly Method<TRequest, TResponse> method;
+ readonly ClientStreamingServerMethod<TRequest, TResponse> handler;
+
+ public ClientStreamingServerCallHandler(Method<TRequest, TResponse> method, ClientStreamingServerMethod<TRequest, TResponse> handler)
{
- // We don't care about the payload type here.
- var asyncCall = new AsyncCallServer<byte[], byte[]>(
- (payload) => payload, (payload) => payload);
+ this.method = method;
+ this.handler = handler;
+ }
- asyncCall.Initialize(call);
+ public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ {
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
+ method.ResponseMarshaller.Serializer,
+ method.RequestMarshaller.Deserializer);
- var finishedTask = asyncCall.ServerSideCallAsync(new NullObserver<byte[]>());
+ asyncCall.Initialize(call);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
+ var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
- // TODO: check result of the completion status.
- asyncCall.StartSendStatusFromServer(new Status(StatusCode.Unimplemented, "No such method."), new AsyncCompletionDelegate((error) => { }));
+ Status status = Status.DefaultSuccess;
+ try
+ {
+ var result = await handler(requestStream);
+ try
+ {
+ await responseStream.Write(result);
+ }
+ catch (OperationCanceledException)
+ {
+ status = Status.DefaultCancelled;
+ }
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured in handler: " + e);
+ status = HandlerUtils.StatusFromException(e);
+ }
- finishedTask.Wait();
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
+ await finishedTask;
}
}
- internal class NullObserver<T> : IObserver<T>
+ internal class DuplexStreamingServerCallHandler<TRequest, TResponse> : IServerCallHandler
{
- public void OnCompleted()
+ readonly Method<TRequest, TResponse> method;
+ readonly DuplexStreamingServerMethod<TRequest, TResponse> handler;
+
+ public DuplexStreamingServerCallHandler(Method<TRequest, TResponse> method, DuplexStreamingServerMethod<TRequest, TResponse> handler)
{
+ this.method = method;
+ this.handler = handler;
+ }
+
+ public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
+ {
+ var asyncCall = new AsyncCallServer<TRequest, TResponse>(
+ method.ResponseMarshaller.Serializer,
+ method.RequestMarshaller.Deserializer);
+
+ asyncCall.Initialize(call);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
+ var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
+
+ Status status = Status.DefaultSuccess;
+ try
+ {
+ await handler(requestStream, responseStream);
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception occured in handler: " + e);
+ status = HandlerUtils.StatusFromException(e);
+ }
+ try
+ {
+ await responseStream.WriteStatus(status);
+ }
+ catch (OperationCanceledException)
+ {
+ // Call has been already cancelled.
+ }
+ await finishedTask;
}
+ }
- public void OnError(Exception error)
+ internal class NoSuchMethodCallHandler : IServerCallHandler
+ {
+ public async Task HandleCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq)
{
+ // We don't care about the payload type here.
+ var asyncCall = new AsyncCallServer<byte[], byte[]>(
+ (payload) => payload, (payload) => payload);
+
+ asyncCall.Initialize(call);
+ var finishedTask = asyncCall.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall);
+ var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
+
+ await responseStream.WriteStatus(new Status(StatusCode.Unimplemented, "No such method."));
+ // TODO(jtattermusch): if we don't read what client has sent, the server call never gets disposed.
+ await requestStream.ToList();
+ await finishedTask;
}
+ }
- public void OnNext(T value)
+ internal static class HandlerUtils
+ {
+ public static Status StatusFromException(Exception e)
{
+ // TODO(jtattermusch): what is the right status code here?
+ return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
}
}