diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/ServerCallHandler.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 79 |
1 files changed, 44 insertions, 35 deletions
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 1f83e51548..85b7a4b01e 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -75,29 +75,32 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + Tuple<TResponse,WriteFlags> responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); - var result = await handler(request, context).ConfigureAwait(false); + var response = await handler(request, context).ConfigureAwait(false); status = context.Status; - await responseStream.WriteAsync(result).ConfigureAwait(false); + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -136,24 +139,26 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); await handler(request, responseStream, context).ConfigureAwait(false); status = context.Status; } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -187,33 +192,31 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + Tuple<TResponse,WriteFlags> responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { - var result = await handler(requestStream, context).ConfigureAwait(false); + var response = await handler(requestStream, context).ConfigureAwait(false); status = context.Status; - try - { - await responseStream.WriteAsync(result).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - status = Status.DefaultCancelled; - } + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -255,16 +258,20 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -282,9 +289,7 @@ namespace Grpc.Core.Internal asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); - - await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); } } @@ -300,10 +305,14 @@ namespace Grpc.Core.Internal return rpcException.Status; } - // TODO(jtattermusch): what is the right status code here? return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } + public static WriteFlags GetWriteFlags(WriteOptions writeOptions) + { + return writeOptions != null ? writeOptions.Flags : default(WriteFlags); + } + public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken) where TRequest : class where TResponse : class |