aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal/AsyncCall.cs
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-04-30 11:56:46 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-05-04 09:21:37 -0700
commita5272b6adc5fb7e8c71b7216b0f5e690980a79b2 (patch)
tree41025f6975bc0058b8bce500d04c01d6546f882c /src/csharp/Grpc.Core/Internal/AsyncCall.cs
parent520ecb18f5b400b9c4e44a56acacc098cfaa7f77 (diff)
A new version C# API based on async/await
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs44
1 files changed, 24 insertions, 20 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index bc72cb78de..fd94771ddd 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -43,7 +43,7 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// Handles client side native call lifecycle.
+ /// Manages client side native call lifecycle.
/// </summary>
internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
{
@@ -160,7 +160,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - streamed response call.
/// </summary>
- public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver, Metadata headers)
+ public void StartServerStreamingCall(TRequest msg, Metadata headers)
{
lock (myLock)
{
@@ -169,17 +169,13 @@ namespace Grpc.Core.Internal
started = true;
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
-
- this.readObserver = readObserver;
byte[] payload = UnsafeSerialize(msg);
-
+
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartServerStreaming(payload, finishedHandler, metadataArray);
}
-
- StartReceiveMessage();
}
}
@@ -187,7 +183,7 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public void StartDuplexStreamingCall(IObserver<TResponse> readObserver, Metadata headers)
+ public void StartDuplexStreamingCall(Metadata headers)
{
lock (myLock)
{
@@ -195,14 +191,10 @@ namespace Grpc.Core.Internal
started = true;
- this.readObserver = readObserver;
-
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
call.StartDuplexStreaming(finishedHandler, metadataArray);
}
-
- StartReceiveMessage();
}
}
@@ -210,17 +202,26 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate)
+ public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
{
StartSendMessageInternal(msg, completionDelegate);
}
/// <summary>
+ /// Receives a streaming response. Only one pending read action is allowed at any given time.
+ /// completionDelegate is called when the operation finishes.
+ /// </summary>
+ public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
+ {
+ StartReadMessageInternal(completionDelegate);
+ }
+
+ /// <summary>
/// Sends halfclose, indicating client is done with streaming requests.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate)
+ public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
@@ -235,12 +236,12 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// On client-side, we only fire readObserver.OnCompleted once all messages have been read
+ /// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
/// </summary>
- protected override void CompleteReadObserver()
+ protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate)
{
- if (readingDone && finishedStatus.HasValue)
+ if (completionDelegate != null && readingDone && finishedStatus.HasValue)
{
bool shouldComplete;
lock (myLock)
@@ -254,11 +255,11 @@ namespace Grpc.Core.Internal
var status = finishedStatus.Value;
if (status.StatusCode != StatusCode.OK)
{
- FireReadObserverOnError(new RpcException(status));
+ FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
}
else
{
- FireReadObserverOnCompleted();
+ FireCompletion(completionDelegate, default(TResponse), null);
}
}
}
@@ -304,15 +305,18 @@ namespace Grpc.Core.Internal
{
var status = ctx.GetReceivedStatus();
+ AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
lock (myLock)
{
finished = true;
finishedStatus = status;
+ origReadCompletionDelegate = readCompletionDelegate;
+
ReleaseResourcesIfPossible();
}
- CompleteReadObserver();
+ ProcessLastRead(origReadCompletionDelegate);
}
}
} \ No newline at end of file