From ea02eb619d3565a9e03f0cd25e439b01845b6536 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Wed, 19 Aug 2015 17:26:53 -0700 Subject: introduce INativeCall interface to simplify testing --- src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 42 ++++++++++++------------- 1 file changed, 21 insertions(+), 21 deletions(-) (limited to 'src/csharp/Grpc.Core/Internal/CallSafeHandle.cs') diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 3cb01e29bd..e1466da65b 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -40,7 +40,7 @@ namespace Grpc.Core.Internal /// /// grpc_call from /// - internal class CallSafeHandle : SafeHandleZeroIsInvalid + internal class CallSafeHandle : SafeHandleZeroIsInvalid, INativeCall { public static readonly CallSafeHandle NullInstance = new CallSafeHandle(); @@ -109,10 +109,10 @@ namespace Grpc.Core.Internal this.completionRegistry = completionRegistry; } - public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage())); grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } @@ -123,66 +123,66 @@ namespace Grpc.Core.Internal .CheckOk(); } - public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage())); grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); } - public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } - public void StartSendCloseFromClient(BatchCompletionDelegate callback) + public void StartSendCloseFromClient(SendCompletionHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); } - public void StartReceiveMessage(BatchCompletionDelegate callback) + public void StartReceiveMessage(ReceivedMessageHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); grpcsharp_call_recv_message(this, ctx).CheckOk(); } - public void StartServerSide(BatchCompletionDelegate callback) + public void StartServerSide(ReceivedCloseOnServerHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); grpcsharp_call_start_serverside(this, ctx).CheckOk(); } - public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } -- cgit v1.2.3 From fb34a99d9810cf4cac2c1d20813379d5ea976adf Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 21 Aug 2015 10:45:39 -0700 Subject: reading of response headers for unary response calls --- .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 4 +-- src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs | 19 ++++++++++++++ src/csharp/Grpc.Core/AsyncClientStreamingCall.cs | 15 ++++++++++- src/csharp/Grpc.Core/AsyncUnaryCall.cs | 15 ++++++++++- src/csharp/Grpc.Core/Calls.cs | 4 +-- src/csharp/Grpc.Core/Internal/AsyncCall.cs | 29 ++++++++++++++-------- src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 4 +-- src/csharp/Grpc.Core/Internal/INativeCall.cs | 3 ++- 8 files changed, 74 insertions(+), 19 deletions(-) (limited to 'src/csharp/Grpc.Core/Internal/CallSafeHandle.cs') diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 141af7760c..1fa895ba71 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -66,7 +66,7 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_CompletionSuccess() { var resultTask = asyncCall.UnaryCallAsync("abc"); - fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), new byte[] { 1, 2, 3 }); + fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), new byte[] { 1, 2, 3 }, new Metadata()); Assert.IsTrue(resultTask.IsCompleted); Assert.IsTrue(fakeCall.IsDisposed); Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); @@ -76,7 +76,7 @@ namespace Grpc.Core.Internal.Tests public void AsyncUnary_CompletionFailure() { var resultTask = asyncCall.UnaryCallAsync("abc"); - fakeCall.UnaryResponseClientHandler(false, new ClientSideStatus(), null); + fakeCall.UnaryResponseClientHandler(false, new ClientSideStatus(new Status(StatusCode.Internal, ""), null), new byte[] { 1, 2, 3 }, new Metadata()); Assert.IsTrue(resultTask.IsCompleted); Assert.IsTrue(fakeCall.IsDisposed); diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs index 706006702e..8ad41af1b8 100644 --- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs +++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs @@ -73,6 +73,25 @@ namespace Grpc.Core.Tests server.ShutdownAsync().Wait(); } + [Test] + public async Task ResponseHeadersAsync_UnaryCall() + { + helper.UnaryHandler = new UnaryServerMethod(async (request, context) => + { + await context.WriteResponseHeadersAsync(headers); + return "PASS"; + }); + + var call = Calls.AsyncUnaryCall(helper.CreateUnaryCall(), ""); + var responseHeaders = await call.ResponseHeadersAsync; + + Assert.AreEqual(headers.Count, responseHeaders.Count); + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + Assert.AreEqual("abcdefg", responseHeaders[0].Value); + + Assert.AreEqual("PASS", await call.ResponseAsync); + } + [Test] public void WriteResponseHeaders_NullNotAllowed() { diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index fb9b562c77..dbaa3085c5 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -44,14 +44,16 @@ namespace Grpc.Core { readonly IClientStreamWriter requestStream; readonly Task responseAsync; + readonly Task responseHeadersAsync; readonly Func getStatusFunc; readonly Func getTrailersFunc; readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter requestStream, Task responseAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + public AsyncClientStreamingCall(IClientStreamWriter requestStream, Task responseAsync, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseAsync = responseAsync; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -68,6 +70,17 @@ namespace Grpc.Core } } + /// + /// Asynchronous access to response headers. + /// + public Task ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + /// /// Async stream to send streaming requests. /// diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs index 224e343916..154a17a33e 100644 --- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -43,13 +43,15 @@ namespace Grpc.Core public sealed class AsyncUnaryCall : IDisposable { readonly Task responseAsync; + readonly Task responseHeadersAsync; readonly Func getStatusFunc; readonly Func getTrailersFunc; readonly Action disposeAction; - public AsyncUnaryCall(Task responseAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + public AsyncUnaryCall(Task responseAsync, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) { this.responseAsync = responseAsync; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -66,6 +68,17 @@ namespace Grpc.Core } } + /// + /// Asynchronous access to response headers. + /// + public Task ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + /// /// Allows awaiting this object directly. /// diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 7067456638..ada3616aa4 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -74,7 +74,7 @@ namespace Grpc.Core { var asyncCall = new AsyncCall(call); var asyncResult = asyncCall.UnaryCallAsync(req); - return new AsyncUnaryCall(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncUnaryCall(asyncResult, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } /// @@ -110,7 +110,7 @@ namespace Grpc.Core var asyncCall = new AsyncCall(call); var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream(asyncCall); - return new AsyncClientStreamingCall(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncClientStreamingCall(requestStream, resultTask, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } /// diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 30d60077f0..132b426424 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -56,6 +56,9 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource unaryResponseTcs; + // Response headers set here once received. + TaskCompletionSource responseHeadersTcs = new TaskCompletionSource(); + // Set after status is received. Used for both unary and streaming response calls. ClientSideStatus? finishedStatus; @@ -110,7 +113,7 @@ namespace Grpc.Core.Internal bool success = (ev.success != 0); try { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage()); + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); } catch (Exception e) { @@ -257,6 +260,17 @@ namespace Grpc.Core.Internal } } + /// + /// Get the task that completes once response headers are received. + /// + public Task ResponseHeadersAsync + { + get + { + return responseHeadersTcs.Task; + } + } + /// /// Gets the resulting status if the call has already finished. /// Throws InvalidOperationException otherwise. @@ -371,7 +385,7 @@ namespace Grpc.Core.Internal /// /// Handler for unary response completion. /// - private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage) + private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) { lock (myLock) { @@ -383,18 +397,13 @@ namespace Grpc.Core.Internal ReleaseResourcesIfPossible(); } - if (!success) - { - var internalError = new Status(StatusCode.Internal, "Internal error occured."); - finishedStatus = new ClientSideStatus(internalError, null); - unaryResponseTcs.SetException(new RpcException(internalError)); - return; - } + responseHeadersTcs.SetResult(responseHeaders); var status = receivedStatus.Status; - if (status.StatusCode != StatusCode.OK) + if (!success || status.StatusCode != StatusCode.OK) { + unaryResponseTcs.SetException(new RpcException(status)); return; } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index e1466da65b..ed6747ea93 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -112,7 +112,7 @@ namespace Grpc.Core.Internal public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage())); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } @@ -126,7 +126,7 @@ namespace Grpc.Core.Internal public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage())); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index 42028e458c..ef2e230ff8 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -33,8 +33,9 @@ using System; namespace Grpc.Core.Internal { - internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage); + internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders); + // Received status for streaming response calls. internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus); internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage); -- cgit v1.2.3 From 4c25efa5195a81141ec1fc1dfa9dca42a74d377a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 21 Aug 2015 16:07:57 -0700 Subject: support for reading response headers on client side --- src/csharp/.gitignore | 1 + .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 11 ++++ src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs | 58 ++++++++++++++++++++++ src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs | 16 +++++- src/csharp/Grpc.Core/AsyncServerStreamingCall.cs | 16 +++++- src/csharp/Grpc.Core/Calls.cs | 4 +- src/csharp/Grpc.Core/Channel.cs | 1 - src/csharp/Grpc.Core/Internal/AsyncCall.cs | 10 ++++ src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 11 ++++ src/csharp/Grpc.Core/Internal/INativeCall.cs | 4 ++ src/csharp/ext/grpc_csharp_ext.c | 55 ++++++++++---------- 11 files changed, 156 insertions(+), 31 deletions(-) (limited to 'src/csharp/Grpc.Core/Internal/CallSafeHandle.cs') diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore index ae48956567..48365e32a5 100644 --- a/src/csharp/.gitignore +++ b/src/csharp/.gitignore @@ -5,4 +5,5 @@ test-results packages Grpc.v12.suo TestResult.xml +/TestResults *.nupkg diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 1fa895ba71..5747f3ba04 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -137,6 +137,12 @@ namespace Grpc.Core.Internal.Tests set; } + public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler + { + get; + set; + } + public SendCompletionHandler SendCompletionHandler { get; @@ -206,6 +212,11 @@ namespace Grpc.Core.Internal.Tests ReceivedMessageHandler = callback; } + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + { + ReceivedResponseHeadersHandler = callback; + } + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) { SendCompletionHandler = callback; diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs index 8ad41af1b8..76e36626b1 100644 --- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs +++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs @@ -32,13 +32,16 @@ #endregion using System; +using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; + using Grpc.Core; using Grpc.Core.Internal; using Grpc.Core.Utils; + using NUnit.Framework; namespace Grpc.Core.Tests @@ -92,6 +95,61 @@ namespace Grpc.Core.Tests Assert.AreEqual("PASS", await call.ResponseAsync); } + [Test] + public async Task ResponseHeadersAsync_ClientStreamingCall() + { + helper.ClientStreamingHandler = new ClientStreamingServerMethod(async (requestStream, context) => + { + await context.WriteResponseHeadersAsync(headers); + return "PASS"; + }); + + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); + await call.RequestStream.CompleteAsync(); + var responseHeaders = await call.ResponseHeadersAsync; + + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + Assert.AreEqual("PASS", await call.ResponseAsync); + } + + [Test] + public async Task ResponseHeadersAsync_ServerStreamingCall() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod(async (request, responseStream, context) => + { + await context.WriteResponseHeadersAsync(headers); + await responseStream.WriteAsync("PASS"); + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + var responseHeaders = await call.ResponseHeadersAsync; + + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + CollectionAssert.AreEqual(new [] { "PASS" }, await call.ResponseStream.ToListAsync()); + } + + [Test] + public async Task ResponseHeadersAsync_DuplexStreamingCall() + { + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod(async (requestStream, responseStream, context) => + { + await context.WriteResponseHeadersAsync(headers); + while (await requestStream.MoveNext()) + { + await responseStream.WriteAsync(requestStream.Current); + } + }); + + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall()); + var responseHeaders = await call.ResponseHeadersAsync; + + var messages = new[] { "PASS" }; + await call.RequestStream.WriteAllAsync(messages); + + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + CollectionAssert.AreEqual(messages, await call.ResponseStream.ToListAsync()); + } + [Test] public void WriteResponseHeaders_NullNotAllowed() { diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index 183c84216a..ee7ba29695 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; namespace Grpc.Core { @@ -42,14 +43,16 @@ namespace Grpc.Core { readonly IClientStreamWriter requestStream; readonly IAsyncStreamReader responseStream; + readonly Task responseHeadersAsync; readonly Func getStatusFunc; readonly Func getTrailersFunc; readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter requestStream, IAsyncStreamReader responseStream, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + public AsyncDuplexStreamingCall(IClientStreamWriter requestStream, IAsyncStreamReader responseStream, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -77,6 +80,17 @@ namespace Grpc.Core } } + /// + /// Asynchronous access to response headers. + /// + public Task ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + /// /// Gets the call status if the call has already finished. /// Throws InvalidOperationException otherwise. diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index ab2049f269..2853a79ce6 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; namespace Grpc.Core { @@ -41,13 +42,15 @@ namespace Grpc.Core public sealed class AsyncServerStreamingCall : IDisposable { readonly IAsyncStreamReader responseStream; + readonly Task responseHeadersAsync; readonly Func getStatusFunc; readonly Func getTrailersFunc; readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader responseStream, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + public AsyncServerStreamingCall(IAsyncStreamReader responseStream, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) { this.responseStream = responseStream; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -64,6 +67,17 @@ namespace Grpc.Core } } + /// + /// Asynchronous access to response headers. + /// + public Task ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + /// /// Gets the call status if the call has already finished. /// Throws InvalidOperationException otherwise. diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index ada3616aa4..e57ac89db3 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -93,7 +93,7 @@ namespace Grpc.Core var asyncCall = new AsyncCall(call); asyncCall.StartServerStreamingCall(req); var responseStream = new ClientResponseStream(asyncCall); - return new AsyncServerStreamingCall(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncServerStreamingCall(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } /// @@ -130,7 +130,7 @@ namespace Grpc.Core asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream(asyncCall); var responseStream = new ClientResponseStream(asyncCall); - return new AsyncDuplexStreamingCall(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncDuplexStreamingCall(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 2f8519dfa3..c11b320a64 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -58,7 +58,6 @@ namespace Grpc.Core readonly List options; bool shutdownRequested; - bool disposed; /// /// Creates a channel that connects to a specific host. diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index d687bb6283..1b00b95bc8 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -199,6 +199,7 @@ namespace Grpc.Core.Internal { call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall()); } + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); } } @@ -219,6 +220,7 @@ namespace Grpc.Core.Internal { call.StartDuplexStreaming(HandleFinished, metadataArray); } + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); } } @@ -362,6 +364,14 @@ namespace Grpc.Core.Internal return writeOptions != null ? writeOptions.Flags : default(WriteFlags); } + /// + /// Handles receive status completion for calls with streaming response. + /// + private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders) + { + responseHeadersTcs.SetResult(responseHeaders); + } + /// /// Handler for unary response completion. /// diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index ed6747ea93..0f187529e8 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -86,6 +86,10 @@ namespace Grpc.Core.Internal static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, BatchContextSafeHandle ctx); + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call, + BatchContextSafeHandle ctx); + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, BatchContextSafeHandle ctx); @@ -172,6 +176,13 @@ namespace Grpc.Core.Internal grpcsharp_call_recv_message(this, ctx).CheckOk(); } + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); + grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); + } + public void StartServerSide(ReceivedCloseOnServerHandler callback) { var ctx = BatchContextSafeHandle.Create(); diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index ef2e230ff8..ed4257d1f4 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -40,6 +40,8 @@ namespace Grpc.Core.Internal internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage); + internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders); + internal delegate void SendCompletionHandler(bool success); internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled); @@ -67,6 +69,8 @@ namespace Grpc.Core.Internal void StartReceiveMessage(ReceivedMessageHandler callback); + void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback); + void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray); void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata); diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index fc9470f93f..489e219c49 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -595,7 +595,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) { /* TODO: don't use magic number */ - grpc_op ops[5]; + grpc_op ops[4]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -615,23 +615,18 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( ops[2].flags = 0; ops[2].reserved = NULL; - ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); - ops[3].flags = 0; - ops[3].reserved = NULL; - - ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[4].data.recv_status_on_client.trailing_metadata = + ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[3].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[4].data.recv_status_on_client.status = + ops[3].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[4].data.recv_status_on_client.status_details = + ops[3].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[4].data.recv_status_on_client.status_details_capacity = + ops[3].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); - ops[4].flags = 0; - ops[4].reserved = NULL; + ops[3].flags = 0; + ops[3].reserved = NULL; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, NULL); @@ -642,7 +637,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, grpcsharp_batch_context *ctx, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ - grpc_op ops[3]; + grpc_op ops[2]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -652,28 +647,36 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, ops[0].flags = 0; ops[0].reserved = NULL; - ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); - ops[1].flags = 0; - ops[1].reserved = NULL; - - ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[2].data.recv_status_on_client.trailing_metadata = + ops[1].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[1].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[2].data.recv_status_on_client.status = + ops[1].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[2].data.recv_status_on_client.status_details = + ops[1].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[2].data.recv_status_on_client.status_details_capacity = + ops[1].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); - ops[2].flags = 0; - ops[2].reserved = NULL; + ops[1].flags = 0; + ops[1].reserved = NULL; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, NULL); } +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata( + grpc_call *call, grpcsharp_batch_context *ctx) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + ops[0].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[0].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[0].flags = 0; + ops[0].reserved = NULL; + + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); +} + GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, -- cgit v1.2.3