diff options
author | 2015-04-30 11:56:46 -0700 | |
---|---|---|
committer | 2015-05-04 09:21:37 -0700 | |
commit | a5272b6adc5fb7e8c71b7216b0f5e690980a79b2 (patch) | |
tree | 41025f6975bc0058b8bce500d04c01d6546f882c /src/csharp/Grpc.Core/Internal/AsyncCall.cs | |
parent | 520ecb18f5b400b9c4e44a56acacc098cfaa7f77 (diff) |
A new version C# API based on async/await
Diffstat (limited to 'src/csharp/Grpc.Core/Internal/AsyncCall.cs')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 44 |
1 files changed, 24 insertions, 20 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index bc72cb78de..fd94771ddd 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -43,7 +43,7 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// <summary> - /// Handles client side native call lifecycle. + /// Manages client side native call lifecycle. /// </summary> internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse> { @@ -160,7 +160,7 @@ namespace Grpc.Core.Internal /// <summary> /// Starts a unary request - streamed response call. /// </summary> - public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver, Metadata headers) + public void StartServerStreamingCall(TRequest msg, Metadata headers) { lock (myLock) { @@ -169,17 +169,13 @@ namespace Grpc.Core.Internal started = true; halfcloseRequested = true; halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. - - this.readObserver = readObserver; byte[] payload = UnsafeSerialize(msg); - + using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { call.StartServerStreaming(payload, finishedHandler, metadataArray); } - - StartReceiveMessage(); } } @@ -187,7 +183,7 @@ namespace Grpc.Core.Internal /// Starts a streaming request - streaming response call. /// Use StartSendMessage and StartSendCloseFromClient to stream requests. /// </summary> - public void StartDuplexStreamingCall(IObserver<TResponse> readObserver, Metadata headers) + public void StartDuplexStreamingCall(Metadata headers) { lock (myLock) { @@ -195,14 +191,10 @@ namespace Grpc.Core.Internal started = true; - this.readObserver = readObserver; - using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { call.StartDuplexStreaming(finishedHandler, metadataArray); } - - StartReceiveMessage(); } } @@ -210,17 +202,26 @@ namespace Grpc.Core.Internal /// Sends a streaming request. Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate) + public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate) { StartSendMessageInternal(msg, completionDelegate); } /// <summary> + /// Receives a streaming response. Only one pending read action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate) + { + StartReadMessageInternal(completionDelegate); + } + + /// <summary> /// Sends halfclose, indicating client is done with streaming requests. /// Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate) + public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate) { lock (myLock) { @@ -235,12 +236,12 @@ namespace Grpc.Core.Internal } /// <summary> - /// On client-side, we only fire readObserver.OnCompleted once all messages have been read + /// On client-side, we only fire readCompletionDelegate once all messages have been read /// and status has been received. /// </summary> - protected override void CompleteReadObserver() + protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate) { - if (readingDone && finishedStatus.HasValue) + if (completionDelegate != null && readingDone && finishedStatus.HasValue) { bool shouldComplete; lock (myLock) @@ -254,11 +255,11 @@ namespace Grpc.Core.Internal var status = finishedStatus.Value; if (status.StatusCode != StatusCode.OK) { - FireReadObserverOnError(new RpcException(status)); + FireCompletion(completionDelegate, default(TResponse), new RpcException(status)); } else { - FireReadObserverOnCompleted(); + FireCompletion(completionDelegate, default(TResponse), null); } } } @@ -304,15 +305,18 @@ namespace Grpc.Core.Internal { var status = ctx.GetReceivedStatus(); + AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null; lock (myLock) { finished = true; finishedStatus = status; + origReadCompletionDelegate = readCompletionDelegate; + ReleaseResourcesIfPossible(); } - CompleteReadObserver(); + ProcessLastRead(origReadCompletionDelegate); } } }
\ No newline at end of file |