From 9e14414415d75bca155e5a85a8b7ac226027459f Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 17 Aug 2015 14:58:09 -0700 Subject: refactor auth interceptors --- src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs') diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index 0979de606f..183c84216a 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -32,8 +32,6 @@ #endregion using System; -using System.Runtime.CompilerServices; -using System.Threading.Tasks; namespace Grpc.Core { -- cgit v1.2.3 From 4c25efa5195a81141ec1fc1dfa9dca42a74d377a Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Fri, 21 Aug 2015 16:07:57 -0700 Subject: support for reading response headers on client side --- src/csharp/.gitignore | 1 + .../Grpc.Core.Tests/Internal/AsyncCallTest.cs | 11 ++++ src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs | 58 ++++++++++++++++++++++ src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs | 16 +++++- src/csharp/Grpc.Core/AsyncServerStreamingCall.cs | 16 +++++- src/csharp/Grpc.Core/Calls.cs | 4 +- src/csharp/Grpc.Core/Channel.cs | 1 - src/csharp/Grpc.Core/Internal/AsyncCall.cs | 10 ++++ src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 11 ++++ src/csharp/Grpc.Core/Internal/INativeCall.cs | 4 ++ src/csharp/ext/grpc_csharp_ext.c | 55 ++++++++++---------- 11 files changed, 156 insertions(+), 31 deletions(-) (limited to 'src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs') diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore index ae48956567..48365e32a5 100644 --- a/src/csharp/.gitignore +++ b/src/csharp/.gitignore @@ -5,4 +5,5 @@ test-results packages Grpc.v12.suo TestResult.xml +/TestResults *.nupkg diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 1fa895ba71..5747f3ba04 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -137,6 +137,12 @@ namespace Grpc.Core.Internal.Tests set; } + public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler + { + get; + set; + } + public SendCompletionHandler SendCompletionHandler { get; @@ -206,6 +212,11 @@ namespace Grpc.Core.Internal.Tests ReceivedMessageHandler = callback; } + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + { + ReceivedResponseHeadersHandler = callback; + } + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) { SendCompletionHandler = callback; diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs index 8ad41af1b8..76e36626b1 100644 --- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs +++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs @@ -32,13 +32,16 @@ #endregion using System; +using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; + using Grpc.Core; using Grpc.Core.Internal; using Grpc.Core.Utils; + using NUnit.Framework; namespace Grpc.Core.Tests @@ -92,6 +95,61 @@ namespace Grpc.Core.Tests Assert.AreEqual("PASS", await call.ResponseAsync); } + [Test] + public async Task ResponseHeadersAsync_ClientStreamingCall() + { + helper.ClientStreamingHandler = new ClientStreamingServerMethod(async (requestStream, context) => + { + await context.WriteResponseHeadersAsync(headers); + return "PASS"; + }); + + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); + await call.RequestStream.CompleteAsync(); + var responseHeaders = await call.ResponseHeadersAsync; + + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + Assert.AreEqual("PASS", await call.ResponseAsync); + } + + [Test] + public async Task ResponseHeadersAsync_ServerStreamingCall() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod(async (request, responseStream, context) => + { + await context.WriteResponseHeadersAsync(headers); + await responseStream.WriteAsync("PASS"); + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + var responseHeaders = await call.ResponseHeadersAsync; + + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + CollectionAssert.AreEqual(new [] { "PASS" }, await call.ResponseStream.ToListAsync()); + } + + [Test] + public async Task ResponseHeadersAsync_DuplexStreamingCall() + { + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod(async (requestStream, responseStream, context) => + { + await context.WriteResponseHeadersAsync(headers); + while (await requestStream.MoveNext()) + { + await responseStream.WriteAsync(requestStream.Current); + } + }); + + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall()); + var responseHeaders = await call.ResponseHeadersAsync; + + var messages = new[] { "PASS" }; + await call.RequestStream.WriteAllAsync(messages); + + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + CollectionAssert.AreEqual(messages, await call.ResponseStream.ToListAsync()); + } + [Test] public void WriteResponseHeaders_NullNotAllowed() { diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index 183c84216a..ee7ba29695 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; namespace Grpc.Core { @@ -42,14 +43,16 @@ namespace Grpc.Core { readonly IClientStreamWriter requestStream; readonly IAsyncStreamReader responseStream; + readonly Task responseHeadersAsync; readonly Func getStatusFunc; readonly Func getTrailersFunc; readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter requestStream, IAsyncStreamReader responseStream, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + public AsyncDuplexStreamingCall(IClientStreamWriter requestStream, IAsyncStreamReader responseStream, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -77,6 +80,17 @@ namespace Grpc.Core } } + /// + /// Asynchronous access to response headers. + /// + public Task ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + /// /// Gets the call status if the call has already finished. /// Throws InvalidOperationException otherwise. diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index ab2049f269..2853a79ce6 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; namespace Grpc.Core { @@ -41,13 +42,15 @@ namespace Grpc.Core public sealed class AsyncServerStreamingCall : IDisposable { readonly IAsyncStreamReader responseStream; + readonly Task responseHeadersAsync; readonly Func getStatusFunc; readonly Func getTrailersFunc; readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader responseStream, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) + public AsyncServerStreamingCall(IAsyncStreamReader responseStream, Task responseHeadersAsync, Func getStatusFunc, Func getTrailersFunc, Action disposeAction) { this.responseStream = responseStream; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -64,6 +67,17 @@ namespace Grpc.Core } } + /// + /// Asynchronous access to response headers. + /// + public Task ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + /// /// Gets the call status if the call has already finished. /// Throws InvalidOperationException otherwise. diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index ada3616aa4..e57ac89db3 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -93,7 +93,7 @@ namespace Grpc.Core var asyncCall = new AsyncCall(call); asyncCall.StartServerStreamingCall(req); var responseStream = new ClientResponseStream(asyncCall); - return new AsyncServerStreamingCall(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncServerStreamingCall(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } /// @@ -130,7 +130,7 @@ namespace Grpc.Core asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream(asyncCall); var responseStream = new ClientResponseStream(asyncCall); - return new AsyncDuplexStreamingCall(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncDuplexStreamingCall(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 2f8519dfa3..c11b320a64 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -58,7 +58,6 @@ namespace Grpc.Core readonly List options; bool shutdownRequested; - bool disposed; /// /// Creates a channel that connects to a specific host. diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index d687bb6283..1b00b95bc8 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -199,6 +199,7 @@ namespace Grpc.Core.Internal { call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall()); } + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); } } @@ -219,6 +220,7 @@ namespace Grpc.Core.Internal { call.StartDuplexStreaming(HandleFinished, metadataArray); } + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); } } @@ -362,6 +364,14 @@ namespace Grpc.Core.Internal return writeOptions != null ? writeOptions.Flags : default(WriteFlags); } + /// + /// Handles receive status completion for calls with streaming response. + /// + private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders) + { + responseHeadersTcs.SetResult(responseHeaders); + } + /// /// Handler for unary response completion. /// diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index ed6747ea93..0f187529e8 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -86,6 +86,10 @@ namespace Grpc.Core.Internal static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, BatchContextSafeHandle ctx); + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call, + BatchContextSafeHandle ctx); + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, BatchContextSafeHandle ctx); @@ -172,6 +176,13 @@ namespace Grpc.Core.Internal grpcsharp_call_recv_message(this, ctx).CheckOk(); } + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); + grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); + } + public void StartServerSide(ReceivedCloseOnServerHandler callback) { var ctx = BatchContextSafeHandle.Create(); diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs index ef2e230ff8..ed4257d1f4 100644 --- a/src/csharp/Grpc.Core/Internal/INativeCall.cs +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -40,6 +40,8 @@ namespace Grpc.Core.Internal internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage); + internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders); + internal delegate void SendCompletionHandler(bool success); internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled); @@ -67,6 +69,8 @@ namespace Grpc.Core.Internal void StartReceiveMessage(ReceivedMessageHandler callback); + void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback); + void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray); void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata); diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index fc9470f93f..489e219c49 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -595,7 +595,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) { /* TODO: don't use magic number */ - grpc_op ops[5]; + grpc_op ops[4]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -615,23 +615,18 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( ops[2].flags = 0; ops[2].reserved = NULL; - ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); - ops[3].flags = 0; - ops[3].reserved = NULL; - - ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[4].data.recv_status_on_client.trailing_metadata = + ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[3].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[4].data.recv_status_on_client.status = + ops[3].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[4].data.recv_status_on_client.status_details = + ops[3].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[4].data.recv_status_on_client.status_details_capacity = + ops[3].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); - ops[4].flags = 0; - ops[4].reserved = NULL; + ops[3].flags = 0; + ops[3].reserved = NULL; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, NULL); @@ -642,7 +637,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, grpcsharp_batch_context *ctx, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ - grpc_op ops[3]; + grpc_op ops[2]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -652,28 +647,36 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, ops[0].flags = 0; ops[0].reserved = NULL; - ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); - ops[1].flags = 0; - ops[1].reserved = NULL; - - ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[2].data.recv_status_on_client.trailing_metadata = + ops[1].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[1].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[2].data.recv_status_on_client.status = + ops[1].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[2].data.recv_status_on_client.status_details = + ops[1].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[2].data.recv_status_on_client.status_details_capacity = + ops[1].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); - ops[2].flags = 0; - ops[2].reserved = NULL; + ops[1].flags = 0; + ops[1].reserved = NULL; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, NULL); } +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata( + grpc_call *call, grpcsharp_batch_context *ctx) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + ops[0].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[0].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[0].flags = 0; + ops[0].reserved = NULL; + + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); +} + GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, -- cgit v1.2.3