diff options
-rw-r--r-- | src/csharp/Grpc.Core/AsyncClientStreamingCall.cs | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs | 25 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/AsyncServerStreamingCall.cs | 24 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/AsyncUnaryCall.cs | 106 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Calls.cs | 6 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Grpc.Core.csproj | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 40 |
7 files changed, 200 insertions, 17 deletions
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index d66b0d4974..98ebeea318 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -44,12 +44,16 @@ namespace Grpc.Core { readonly IClientStreamWriter<TRequest> requestStream; readonly Task<TResponse> result; + readonly Func<Status> getStatusFunc; + readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Action disposeAction) + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> result, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.result = result; + this.getStatusFunc = getStatusFunc; + this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; } @@ -85,7 +89,7 @@ namespace Grpc.Core } /// <summary> - /// Provides means to provide after the call. + /// Provides means to cleanup after the call. /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything. /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. /// As a result, all resources being used by the call should be released eventually. diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index 4c0d5936ac..d76272c59b 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -44,14 +44,19 @@ namespace Grpc.Core { readonly IClientStreamWriter<TRequest> requestStream; readonly IAsyncStreamReader<TResponse> responseStream; + readonly Func<Status> getStatusFunc; + readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Action disposeAction) + public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; + this.getStatusFunc = getStatusFunc; + this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; } + /// <summary> /// Async stream to read streaming responses. @@ -76,6 +81,24 @@ namespace Grpc.Core } /// <summary> + /// Gets the call status if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Status GetStatus() + { + return getStatusFunc(); + } + + /// <summary> + /// Gets the call trailing metadata if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Metadata GetTrailers() + { + return getTrailersFunc(); + } + + /// <summary> /// Provides means to cleanup after the call. /// If the call has already finished normally (request stream has been completed and response stream has been fully read), doesn't do anything. /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index 7a479b9a23..380efcdb0e 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -43,11 +43,15 @@ namespace Grpc.Core public sealed class AsyncServerStreamingCall<TResponse> : IDisposable { readonly IAsyncStreamReader<TResponse> responseStream; + readonly Func<Status> getStatusFunc; + readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Action disposeAction) + public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.responseStream = responseStream; + this.getStatusFunc = getStatusFunc; + this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; } @@ -63,6 +67,24 @@ namespace Grpc.Core } /// <summary> + /// Gets the call status if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Status GetStatus() + { + return getStatusFunc(); + } + + /// <summary> + /// Gets the call trailing metadata if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Metadata GetTrailers() + { + return getTrailersFunc(); + } + + /// <summary> /// Provides means to cleanup after the call. /// If the call has already finished normally (response stream has been fully read), doesn't do anything. /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs new file mode 100644 index 0000000000..c644d477ef --- /dev/null +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -0,0 +1,106 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; + +namespace Grpc.Core +{ + /// <summary> + /// Return type for single request - single response call. + /// </summary> + public sealed class AsyncUnaryCall<TResponse> : IDisposable + { + readonly Task<TResponse> result; + readonly Func<Status> getStatusFunc; + readonly Func<Metadata> getTrailersFunc; + readonly Action disposeAction; + + public AsyncUnaryCall(Task<TResponse> result, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + { + this.result = result; + this.getStatusFunc = getStatusFunc; + this.getTrailersFunc = getTrailersFunc; + this.disposeAction = disposeAction; + } + + /// <summary> + /// Asynchronous call result. + /// </summary> + public Task<TResponse> Result + { + get + { + return this.result; + } + } + + /// <summary> + /// Allows awaiting this object directly. + /// </summary> + public TaskAwaiter<TResponse> GetAwaiter() + { + return result.GetAwaiter(); + } + + /// <summary> + /// Gets the call status if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Status GetStatus() + { + return getStatusFunc(); + } + + /// <summary> + /// Gets the call trailing metadata if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Metadata GetTrailers() + { + return getTrailersFunc(); + } + + /// <summary> + /// Provides means to cleanup after the call. + /// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything. + /// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call. + /// As a result, all resources being used by the call should be released eventually. + /// </summary> + public void Dispose() + { + disposeAction.Invoke(); + } + } +} diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 9e95182c72..61231ba612 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -73,7 +73,7 @@ namespace Grpc.Core asyncCall.StartServerStreamingCall(req, call.Headers); RegisterCancellationCallback(asyncCall, token); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.Cancel); + return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } public static AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -85,7 +85,7 @@ namespace Grpc.Core var resultTask = asyncCall.ClientStreamingCallAsync(call.Headers); RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); - return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.Cancel); + return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) @@ -98,7 +98,7 @@ namespace Grpc.Core RegisterCancellationCallback(asyncCall, token); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.Cancel); + return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } private static void RegisterCancellationCallback<TRequest, TResponse>(AsyncCall<TRequest, TResponse> asyncCall, CancellationToken token) diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index a227fe5477..3b9b3b6f7e 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -33,13 +33,12 @@ </PropertyGroup> <ItemGroup> <Reference Include="System" /> - <Reference Include="System.Collections.Immutable, Version=1.1.36.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL"> - <SpecificVersion>False</SpecificVersion> - <HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> - </Reference> <Reference Include="System.Interactive.Async"> <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath> </Reference> + <Reference Include="System.Collections.Immutable"> + <HintPath>..\packages\System.Collections.Immutable.1.1.36\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath> + </Reference> </ItemGroup> <ItemGroup> <Compile Include="AsyncDuplexStreamingCall.cs" /> @@ -102,6 +101,7 @@ <Compile Include="Internal\CompletionRegistry.cs" /> <Compile Include="Internal\BatchContextSafeHandle.cs" /> <Compile Include="ChannelOptions.cs" /> + <Compile Include="AsyncUnaryCall.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 660ad1c32a..f983dbb759 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -52,8 +52,8 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; - // Set after status is received. Only used for streaming response calls. - Status? finishedStatus; + // Set after status is received. Used for both unary and streaming response calls. + ClientSideStatus? finishedStatus; bool readObserverCompleted; // True if readObserver has already been completed. @@ -249,6 +249,32 @@ namespace Grpc.Core.Internal } /// <summary> + /// Gets the resulting status if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Status GetStatus() + { + lock (myLock) + { + Preconditions.CheckState(finishedStatus.HasValue, "Status can only be accessed once the call has finished."); + return finishedStatus.Value.Status; + } + } + + /// <summary> + /// Gets the trailing metadata if the call has already finished. + /// Throws InvalidOperationException otherwise. + /// </summary> + public Metadata GetTrailers() + { + lock (myLock) + { + Preconditions.CheckState(finishedStatus.HasValue, "Trailers can only be accessed once the call has finished."); + return finishedStatus.Value.Trailers; + } + } + + /// <summary> /// On client-side, we only fire readCompletionDelegate once all messages have been read /// and status has been received. /// </summary> @@ -265,7 +291,7 @@ namespace Grpc.Core.Internal if (shouldComplete) { - var status = finishedStatus.Value; + var status = finishedStatus.Value.Status; if (status.StatusCode != StatusCode.OK) { FireCompletion(completionDelegate, default(TResponse), new RpcException(status)); @@ -288,9 +314,13 @@ namespace Grpc.Core.Internal /// </summary> private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx) { + var fullStatus = ctx.GetReceivedStatusOnClient(); + lock (myLock) { finished = true; + finishedStatus = fullStatus; + halfclosed = true; ReleaseResourcesIfPossible(); @@ -302,7 +332,6 @@ namespace Grpc.Core.Internal return; } - var fullStatus = ctx.GetReceivedStatusOnClient(); var status = fullStatus.Status; if (status.StatusCode != StatusCode.OK) @@ -324,13 +353,12 @@ namespace Grpc.Core.Internal private void HandleFinished(bool success, BatchContextSafeHandle ctx) { var fullStatus = ctx.GetReceivedStatusOnClient(); - var status = fullStatus.Status; AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null; lock (myLock) { finished = true; - finishedStatus = status; + finishedStatus = fullStatus; origReadCompletionDelegate = readCompletionDelegate; |