diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 18 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 61 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 58 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 7 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ClientResponseStream.cs | 4 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/INativeCall.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/NativeMethods.cs | 18 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 79 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerRequestStream.cs | 4 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerResponseStream.cs | 7 |
10 files changed, 145 insertions, 113 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 016e1b8587..f522174bd0 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -241,11 +241,10 @@ namespace Grpc.Core.Internal /// <summary> /// Receives a streaming response. Only one pending read action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate) + public Task<TResponse> ReadMessageAsync() { - StartReadMessageInternal(completionDelegate); + return ReadMessageInternalAsync(); } /// <summary> @@ -409,10 +408,13 @@ namespace Grpc.Core.Internal /// </summary> private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) { + // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, + // success will be always set to true. + using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse")) { TResponse msg = default(TResponse); - var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null; + var deserializeException = TryDeserialize(receivedMessage, out msg); lock (myLock) { @@ -425,14 +427,13 @@ namespace Grpc.Core.Internal finishedStatus = receivedStatus; ReleaseResourcesIfPossible(); - } responseHeadersTcs.SetResult(responseHeaders); var status = receivedStatus.Status; - if (!success || status.StatusCode != StatusCode.OK) + if (status.StatusCode != StatusCode.OK) { unaryResponseTcs.SetException(new RpcException(status)); return; @@ -447,6 +448,9 @@ namespace Grpc.Core.Internal /// </summary> private void HandleFinished(bool success, ClientSideStatus receivedStatus) { + // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT, + // success will be always set to true. + lock (myLock) { finished = true; @@ -457,7 +461,7 @@ namespace Grpc.Core.Internal var status = receivedStatus.Status; - if (!success || status.StatusCode != StatusCode.OK) + if (status.StatusCode != StatusCode.OK) { streamingCallFinishedTcs.SetException(new RpcException(status)); return; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index ccd047f469..42234dcac2 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -68,7 +68,8 @@ namespace Grpc.Core.Internal protected bool cancelRequested; protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. - protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null. + protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. + protected TaskCompletionSource<object> sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. protected bool halfcloseRequested; // True if send close have been initiated. @@ -150,15 +151,25 @@ namespace Grpc.Core.Internal /// Initiates reading a message. Only one read operation can be active at a time. /// completionDelegate is invoked upon completion. /// </summary> - protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate) + protected Task<TRead> ReadMessageInternalAsync() { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckReadingAllowed(); + GrpcPreconditions.CheckState(started); + if (readingDone) + { + // the last read that returns null or throws an exception is idempotent + // and maintain its state. + GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); + return streamingReadTcs.Task; + } + + GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time"); + GrpcPreconditions.CheckState(!disposed); call.StartReceiveMessage(HandleReadFinished); - readCompletionDelegate = completionDelegate; + streamingReadTcs = new TaskCompletionSource<TRead>(); + return streamingReadTcs.Task; } } @@ -213,15 +224,6 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } - protected virtual void CheckReadingAllowed() - { - GrpcPreconditions.CheckState(started); - GrpcPreconditions.CheckState(!disposed); - - GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed."); - GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); - } - protected void CheckNotCancelled() { if (cancelRequested) @@ -322,22 +324,18 @@ namespace Grpc.Core.Internal /// </summary> protected void HandleSendStatusFromServerFinished(bool success) { - AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; - ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server.")); + sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server.")); } else { - FireCompletion(origCompletionDelegate, null, null); + sendStatusFromServerTcs.SetResult(null); } } @@ -346,15 +344,17 @@ namespace Grpc.Core.Internal /// </summary> protected void HandleReadFinished(bool success, byte[] receivedMessage) { + // if success == false, received message will be null. It that case we will + // treat this completion as the last read an rely on C core to handle the failed + // read (e.g. deliver approriate statusCode on the clientside). + TRead msg = default(TRead); var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null; - AsyncCompletionDelegate<TRead> origCompletionDelegate = null; + TaskCompletionSource<TRead> origTcs = null; lock (myLock) { - origCompletionDelegate = readCompletionDelegate; - readCompletionDelegate = null; - + origTcs = streamingReadTcs; if (receivedMessage == null) { // This was the last read. @@ -364,20 +364,25 @@ namespace Grpc.Core.Internal if (deserializeException != null && IsClient) { readingDone = true; + + // TODO(jtattermusch): it might be too late to set the status CancelWithStatus(DeserializeResponseFailureStatus); } + if (!readingDone) + { + streamingReadTcs = null; + } + ReleaseResourcesIfPossible(); } - // TODO: handle the case when success==false - if (deserializeException != null && !IsClient) { - FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException)); + origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException)); return; } - FireCompletion(origCompletionDelegate, msg, null); + origTcs.SetResult(msg); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index bea2b3660c..b1566b44a7 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -65,6 +65,15 @@ namespace Grpc.Core.Internal } /// <summary> + /// Only for testing purposes. + /// </summary> + public void InitializeForTesting(INativeCall call) + { + server.AddCallReference(this); + InitializeInternal(call); + } + + /// <summary> /// Starts a server side call. /// </summary> public Task ServerSideCallAsync() @@ -91,11 +100,10 @@ namespace Grpc.Core.Internal /// <summary> /// Receives a streaming request. Only one pending read action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate) + public Task<TRequest> ReadMessageAsync() { - StartReadMessageInternal(completionDelegate); + return ReadMessageInternalAsync(); } /// <summary> @@ -128,24 +136,33 @@ namespace Grpc.Core.Internal } /// <summary> - /// Sends call result status, also indicating server is done with streaming responses. - /// Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. + /// Sends call result status, indicating we are done with writes. + /// Sending a status different from StatusCode.OK will also implicitly cancel the call. /// </summary> - public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate) + public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite) { + byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null; + var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags); + lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: false); + GrpcPreconditions.CheckState(started); + GrpcPreconditions.CheckState(!disposed); + GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once."); using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) { - call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent); + call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent, + payload, writeFlags); } halfcloseRequested = true; - readingDone = true; - sendCompletionDelegate = completionDelegate; + initialMetadataSent = true; + sendStatusFromServerTcs = new TaskCompletionSource<object>(); + if (optionalWrite != null) + { + streamingWritesCounter++; + } + return sendStatusFromServerTcs.Task; } } @@ -174,12 +191,6 @@ namespace Grpc.Core.Internal get { return false; } } - protected override void CheckReadingAllowed() - { - base.CheckReadingAllowed(); - GrpcPreconditions.CheckArgument(!cancelRequested); - } - protected override void OnAfterReleaseResources() { server.RemoveCallReference(this); @@ -190,12 +201,21 @@ namespace Grpc.Core.Internal /// </summary> private void HandleFinishedServerside(bool success, bool cancelled) { + // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER, + // success will be always set to true. lock (myLock) { finished = true; + if (streamingReadTcs == null) + { + // if there's no pending read, readingDone=true will dispose now. + // if there is a pending read, we will dispose once that read finishes. + readingDone = true; + streamingReadTcs = new TaskCompletionSource<TRequest>(); + streamingReadTcs.SetResult(default(TRequest)); + } ReleaseResourcesIfPossible(); } - // TODO(jtattermusch): handle error if (cancelled) { diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 500653ba5d..244b97d4a4 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -135,13 +135,16 @@ namespace Grpc.Core.Internal } } - public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalPayload, WriteFlags writeFlags) { using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); + var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); - Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); + Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata, + optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index d6e34a0f04..ad9423ff58 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -68,9 +68,7 @@ namespace Grpc.Core.Internal { throw new InvalidOperationException("Cancellation of individual reads is not supported."); } - var taskSource = new AsyncCompletionTaskSource<TResponse>(); - call.StartReadMessage(taskSource.CompletionDelegate); - var result = await taskSource.Task.ConfigureAwait(false); + var result = await call.ReadMessageAsync().ConfigureAwait(false); this.current = result; if (result == null) diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index cbef599139..cd3719cb50 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -78,7 +78,7 @@ namespace Grpc.Core.Internal void StartSendCloseFromClient(SendCompletionHandler callback); - void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, Grpc.Core.WriteFlags writeFlags); void StartServerSide(ReceivedCloseOnServerHandler callback); } diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index 9ee0ba3bc0..c277c73ef0 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -421,20 +421,21 @@ namespace Grpc.Core.Internal public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call); public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description); public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call, @@ -593,7 +594,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, @@ -601,7 +602,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] @@ -610,7 +611,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, - BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, @@ -618,7 +619,8 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, + byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 1f83e51548..85b7a4b01e 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -75,29 +75,32 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + Tuple<TResponse,WriteFlags> responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); - var result = await handler(request, context).ConfigureAwait(false); + var response = await handler(request, context).ConfigureAwait(false); status = context.Status; - await responseStream.WriteAsync(result).ConfigureAwait(false); + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -136,24 +139,26 @@ namespace Grpc.Core.Internal { GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false)); var request = requestStream.Current; - // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. - GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false)); await handler(request, responseStream, context).ConfigureAwait(false); status = context.Status; } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -187,33 +192,31 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + Tuple<TResponse,WriteFlags> responseTuple = null; var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken); try { - var result = await handler(requestStream, context).ConfigureAwait(false); + var response = await handler(requestStream, context).ConfigureAwait(false); status = context.Status; - try - { - await responseStream.WriteAsync(result).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - status = Status.DefaultCancelled; - } + responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions)); } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -255,16 +258,20 @@ namespace Grpc.Core.Internal } catch (Exception e) { - Logger.Error(e, "Exception occured in handler."); + if (!(e is RpcException)) + { + Logger.Warning(e, "Exception occured in handler."); + } status = HandlerUtils.StatusFromException(e); } try { - await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false); } - catch (OperationCanceledException) + catch (Exception) { - // Call has been already cancelled. + asyncCall.Cancel(); + throw; } await finishedTask.ConfigureAwait(false); } @@ -282,9 +289,7 @@ namespace Grpc.Core.Internal asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); - var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); - - await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false); + await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); } } @@ -300,10 +305,14 @@ namespace Grpc.Core.Internal return rpcException.Status; } - // TODO(jtattermusch): what is the right status code here? return new Status(StatusCode.Unknown, "Exception was thrown by handler."); } + public static WriteFlags GetWriteFlags(WriteOptions writeOptions) + { + return writeOptions != null ? writeOptions.Flags : default(WriteFlags); + } + public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken) where TRequest : class where TResponse : class diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index e7be82c318..d76030d1ad 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -68,9 +68,7 @@ namespace Grpc.Core.Internal { throw new InvalidOperationException("Cancellation of individual reads is not supported."); } - var taskSource = new AsyncCompletionTaskSource<TRequest>(); - call.StartReadMessage(taskSource.CompletionDelegate); - var result = await taskSource.Task.ConfigureAwait(false); + var result = await call.ReadMessageAsync().ConfigureAwait(false); this.current = result; return result != null; } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index 03e39efc02..ecfee0bfdd 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -57,13 +57,6 @@ namespace Grpc.Core.Internal return taskSource.Task; } - public Task WriteStatusAsync(Status status, Metadata trailers) - { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); - return taskSource.Task; - } - public Task WriteResponseHeadersAsync(Metadata responseHeaders) { var taskSource = new AsyncCompletionTaskSource<object>(); |