diff options
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCallServer.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 39 |
1 files changed, 30 insertions, 9 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index d3a2be553f..171d0c799d 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -43,7 +43,7 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// <summary> - /// Handles server side native call lifecycle. + /// Manages server side native call lifecycle. /// </summary> internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest> { @@ -57,24 +57,22 @@ namespace Grpc.Core.Internal public void Initialize(CallSafeHandle call) { + DebugStats.ActiveServerCalls.Increment(); InitializeInternal(call); } /// <summary> - /// Starts a server side call. Currently, all server side calls are implemented as duplex - /// streaming call and they are adapted to the appropriate streaming arity. + /// Starts a server side call. /// </summary> - public Task ServerSideCallAsync(IObserver<TRequest> readObserver) + public Task ServerSideCallAsync() { lock (myLock) { Preconditions.CheckNotNull(call); started = true; - this.readObserver = readObserver; call.StartServerSide(finishedServersideHandler); - StartReceiveMessage(); return finishedServersideTcs.Task; } } @@ -83,17 +81,26 @@ namespace Grpc.Core.Internal /// Sends a streaming response. Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TResponse msg, AsyncCompletionDelegate completionDelegate) + public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate) { StartSendMessageInternal(msg, completionDelegate); } /// <summary> + /// Receives a streaming request. Only one pending read action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate) + { + StartReadMessageInternal(completionDelegate); + } + + /// <summary> /// Sends call result status, also indicating server is done with streaming responses. /// Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate completionDelegate) + public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate<object> completionDelegate) { lock (myLock) { @@ -106,18 +113,32 @@ namespace Grpc.Core.Internal } } + protected override void OnReleaseResources() + { + DebugStats.ActiveServerCalls.Decrement(); + } + /// <summary> /// Handles the server side close completion. /// </summary> private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx) { + bool cancelled = ctx.GetReceivedCloseOnServerCancelled(); + lock (myLock) { finished = true; + if (cancelled) + { + // Once we cancel, we don't have to care that much + // about reads and writes. + Cancel(); + } + ReleaseResourcesIfPossible(); } - // TODO: handle error ... + // TODO(jtattermusch): handle error finishedServersideTcs.SetResult(null); } |