diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2015-08-21 10:45:39 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2015-08-21 16:13:54 -0700 |
commit | fb34a99d9810cf4cac2c1d20813379d5ea976adf (patch) | |
tree | b908d60d3974b14e131e344ef2ab3d8f5e9daad2 /src/csharp/Grpc.Core | |
parent | ea02eb619d3565a9e03f0cd25e439b01845b6536 (diff) |
reading of response headers for unary response calls
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r-- | src/csharp/Grpc.Core/AsyncClientStreamingCall.cs | 15 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/AsyncUnaryCall.cs | 15 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Calls.cs | 4 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 29 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 4 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/INativeCall.cs | 3 |
6 files changed, 53 insertions, 17 deletions
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index fb9b562c77..dbaa3085c5 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -44,14 +44,16 @@ namespace Grpc.Core { readonly IClientStreamWriter<TRequest> requestStream; readonly Task<TResponse> responseAsync; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseAsync = responseAsync; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -69,6 +71,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Async stream to send streaming requests. /// </summary> public IClientStreamWriter<TRequest> RequestStream diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs index 224e343916..154a17a33e 100644 --- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -43,13 +43,15 @@ namespace Grpc.Core public sealed class AsyncUnaryCall<TResponse> : IDisposable { readonly Task<TResponse> responseAsync; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncUnaryCall(Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.responseAsync = responseAsync; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -67,6 +69,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Allows awaiting this object directly. /// </summary> public TaskAwaiter<TResponse> GetAwaiter() diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 7067456638..ada3616aa4 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -74,7 +74,7 @@ namespace Grpc.Core { var asyncCall = new AsyncCall<TRequest, TResponse>(call); var asyncResult = asyncCall.UnaryCallAsync(req); - return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } /// <summary> @@ -110,7 +110,7 @@ namespace Grpc.Core var asyncCall = new AsyncCall<TRequest, TResponse>(call); var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); - return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 30d60077f0..132b426424 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -56,6 +56,9 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; + // Response headers set here once received. + TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>(); + // Set after status is received. Used for both unary and streaming response calls. ClientSideStatus? finishedStatus; @@ -110,7 +113,7 @@ namespace Grpc.Core.Internal bool success = (ev.success != 0); try { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage()); + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); } catch (Exception e) { @@ -258,6 +261,17 @@ namespace Grpc.Core.Internal } /// <summary> + /// Get the task that completes once response headers are received. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return responseHeadersTcs.Task; + } + } + + /// <summary> /// Gets the resulting status if the call has already finished. /// Throws InvalidOperationException otherwise. /// </summary> @@ -371,7 +385,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handler for unary response completion. /// </summary> - private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage) + private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) { lock (myLock) { @@ -383,18 +397,13 @@ namespace Grpc.Core.Internal ReleaseResourcesIfPossible(); } - if (!success) - { - var internalError = new Status(StatusCode.Internal, "Internal error occured."); - finishedStatus = new ClientSideStatus(internalError, null); - unaryResponseTcs.SetException(new RpcException(internalError)); - return; - } + responseHeadersTcs.SetResult(responseHeaders); var status = receivedStatus.Status; - if (status.StatusCode != StatusCode.OK) + if (!success || status.StatusCode != StatusCode.OK) { + unaryResponseTcs.SetException(new RpcException(status)); return; } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index e1466da65b..ed6747ea93 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -112,7 +112,7 @@ namespace Grpc.Core.Internal public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage())); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } @@ -126,7 +126,7 @@ namespace Grpc.Core.Internal public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage())); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index 42028e458c..ef2e230ff8 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -33,8 +33,9 @@ using System; namespace Grpc.Core.Internal { - internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage); + internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders); + // Received status for streaming response calls. internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus); internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage); |