aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCallServer.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs39
1 files changed, 30 insertions, 9 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index d3a2be553f..171d0c799d 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -43,7 +43,7 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// Handles server side native call lifecycle.
+ /// Manages server side native call lifecycle.
/// </summary>
internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest>
{
@@ -57,24 +57,22 @@ namespace Grpc.Core.Internal
public void Initialize(CallSafeHandle call)
{
+ DebugStats.ActiveServerCalls.Increment();
InitializeInternal(call);
}
/// <summary>
- /// Starts a server side call. Currently, all server side calls are implemented as duplex
- /// streaming call and they are adapted to the appropriate streaming arity.
+ /// Starts a server side call.
/// </summary>
- public Task ServerSideCallAsync(IObserver<TRequest> readObserver)
+ public Task ServerSideCallAsync()
{
lock (myLock)
{
Preconditions.CheckNotNull(call);
started = true;
- this.readObserver = readObserver;
call.StartServerSide(finishedServersideHandler);
- StartReceiveMessage();
return finishedServersideTcs.Task;
}
}
@@ -83,17 +81,26 @@ namespace Grpc.Core.Internal
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, AsyncCompletionDelegate completionDelegate)
+ public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate)
{
StartSendMessageInternal(msg, completionDelegate);
}
/// <summary>
+ /// Receives a streaming request. Only one pending read action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate)
+ {
+ StartReadMessageInternal(completionDelegate);
+ }
+
+ /// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate completionDelegate)
+ public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
@@ -106,18 +113,32 @@ namespace Grpc.Core.Internal
}
}
+ protected override void OnReleaseResources()
+ {
+ DebugStats.ActiveServerCalls.Decrement();
+ }
+
/// <summary>
/// Handles the server side close completion.
/// </summary>
private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx)
{
+ bool cancelled = ctx.GetReceivedCloseOnServerCancelled();
+
lock (myLock)
{
finished = true;
+ if (cancelled)
+ {
+ // Once we cancel, we don't have to care that much
+ // about reads and writes.
+ Cancel();
+ }
+
ReleaseResourcesIfPossible();
}
- // TODO: handle error ...
+ // TODO(jtattermusch): handle error
finishedServersideTcs.SetResult(null);
}