aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-21 16:07:57 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-08-21 16:13:57 -0700
commit4c25efa5195a81141ec1fc1dfa9dca42a74d377a (patch)
treed1140a97bf2af639462a1f3f8145acc06c89fd4d
parent3af838a2d719c65c360f28ee8551c6012f408401 (diff)
support for reading response headers on client side
-rw-r--r--src/csharp/.gitignore1
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs11
-rw-r--r--src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs58
-rw-r--r--src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs16
-rw-r--r--src/csharp/Grpc.Core/AsyncServerStreamingCall.cs16
-rw-r--r--src/csharp/Grpc.Core/Calls.cs4
-rw-r--r--src/csharp/Grpc.Core/Channel.cs1
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs11
-rw-r--r--src/csharp/Grpc.Core/Internal/INativeCall.cs4
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c55
11 files changed, 156 insertions, 31 deletions
diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore
index ae48956567..48365e32a5 100644
--- a/src/csharp/.gitignore
+++ b/src/csharp/.gitignore
@@ -5,4 +5,5 @@ test-results
packages
Grpc.v12.suo
TestResult.xml
+/TestResults
*.nupkg
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 1fa895ba71..5747f3ba04 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -137,6 +137,12 @@ namespace Grpc.Core.Internal.Tests
set;
}
+ public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
+ {
+ get;
+ set;
+ }
+
public SendCompletionHandler SendCompletionHandler
{
get;
@@ -206,6 +212,11 @@ namespace Grpc.Core.Internal.Tests
ReceivedMessageHandler = callback;
}
+ public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
+ {
+ ReceivedResponseHeadersHandler = callback;
+ }
+
public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
{
SendCompletionHandler = callback;
diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
index 8ad41af1b8..76e36626b1 100644
--- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
@@ -32,13 +32,16 @@
#endregion
using System;
+using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
+
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
+
using NUnit.Framework;
namespace Grpc.Core.Tests
@@ -93,6 +96,61 @@ namespace Grpc.Core.Tests
}
[Test]
+ public async Task ResponseHeadersAsync_ClientStreamingCall()
+ {
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ return "PASS";
+ });
+
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
+ await call.RequestStream.CompleteAsync();
+ var responseHeaders = await call.ResponseHeadersAsync;
+
+ Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+ Assert.AreEqual("PASS", await call.ResponseAsync);
+ }
+
+ [Test]
+ public async Task ResponseHeadersAsync_ServerStreamingCall()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ await responseStream.WriteAsync("PASS");
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+ var responseHeaders = await call.ResponseHeadersAsync;
+
+ Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+ CollectionAssert.AreEqual(new [] { "PASS" }, await call.ResponseStream.ToListAsync());
+ }
+
+ [Test]
+ public async Task ResponseHeadersAsync_DuplexStreamingCall()
+ {
+ helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ await context.WriteResponseHeadersAsync(headers);
+ while (await requestStream.MoveNext())
+ {
+ await responseStream.WriteAsync(requestStream.Current);
+ }
+ });
+
+ var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall());
+ var responseHeaders = await call.ResponseHeadersAsync;
+
+ var messages = new[] { "PASS" };
+ await call.RequestStream.WriteAllAsync(messages);
+
+ Assert.AreEqual("ascii-header", responseHeaders[0].Key);
+ CollectionAssert.AreEqual(messages, await call.ResponseStream.ToListAsync());
+ }
+
+ [Test]
public void WriteResponseHeaders_NullNotAllowed()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index 183c84216a..ee7ba29695 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Threading.Tasks;
namespace Grpc.Core
{
@@ -42,14 +43,16 @@ namespace Grpc.Core
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.requestStream = requestStream;
this.responseStream = responseStream;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -78,6 +81,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index ab2049f269..2853a79ce6 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Threading.Tasks;
namespace Grpc.Core
{
@@ -41,13 +42,15 @@ namespace Grpc.Core
public sealed class AsyncServerStreamingCall<TResponse> : IDisposable
{
readonly IAsyncStreamReader<TResponse> responseStream;
+ readonly Task<Metadata> responseHeadersAsync;
readonly Func<Status> getStatusFunc;
readonly Func<Metadata> getTrailersFunc;
readonly Action disposeAction;
- public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
+ public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction)
{
this.responseStream = responseStream;
+ this.responseHeadersAsync = responseHeadersAsync;
this.getStatusFunc = getStatusFunc;
this.getTrailersFunc = getTrailersFunc;
this.disposeAction = disposeAction;
@@ -65,6 +68,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// Asynchronous access to response headers.
+ /// </summary>
+ public Task<Metadata> ResponseHeadersAsync
+ {
+ get
+ {
+ return this.responseHeadersAsync;
+ }
+ }
+
+ /// <summary>
/// Gets the call status if the call has already finished.
/// Throws InvalidOperationException otherwise.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs
index ada3616aa4..e57ac89db3 100644
--- a/src/csharp/Grpc.Core/Calls.cs
+++ b/src/csharp/Grpc.Core/Calls.cs
@@ -93,7 +93,7 @@ namespace Grpc.Core
var asyncCall = new AsyncCall<TRequest, TResponse>(call);
asyncCall.StartServerStreamingCall(req);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
- return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
/// <summary>
@@ -130,7 +130,7 @@ namespace Grpc.Core
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall);
- return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
+ return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel);
}
}
}
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 2f8519dfa3..c11b320a64 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -58,7 +58,6 @@ namespace Grpc.Core
readonly List<ChannelOption> options;
bool shutdownRequested;
- bool disposed;
/// <summary>
/// Creates a channel that connects to a specific host.
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index d687bb6283..1b00b95bc8 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -199,6 +199,7 @@ namespace Grpc.Core.Internal
{
call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
+ call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
}
}
@@ -219,6 +220,7 @@ namespace Grpc.Core.Internal
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
+ call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders);
}
}
@@ -363,6 +365,14 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Handles receive status completion for calls with streaming response.
+ /// </summary>
+ private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
+ {
+ responseHeadersTcs.SetResult(responseHeaders);
+ }
+
+ /// <summary>
/// Handler for unary response completion.
/// </summary>
private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index ed6747ea93..0f187529e8 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -87,6 +87,10 @@ namespace Grpc.Core.Internal
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
+ BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
@@ -172,6 +176,13 @@ namespace Grpc.Core.Internal
grpcsharp_call_recv_message(this, ctx).CheckOk();
}
+ public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+ grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
+ }
+
public void StartServerSide(ReceivedCloseOnServerHandler callback)
{
var ctx = BatchContextSafeHandle.Create();
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
index ef2e230ff8..ed4257d1f4 100644
--- a/src/csharp/Grpc.Core/Internal/INativeCall.cs
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -40,6 +40,8 @@ namespace Grpc.Core.Internal
internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage);
+ internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders);
+
internal delegate void SendCompletionHandler(bool success);
internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled);
@@ -67,6 +69,8 @@ namespace Grpc.Core.Internal
void StartReceiveMessage(ReceivedMessageHandler callback);
+ void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback);
+
void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray);
void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata);
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index fc9470f93f..489e219c49 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -595,7 +595,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer,
size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) {
/* TODO: don't use magic number */
- grpc_op ops[5];
+ grpc_op ops[4];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -615,23 +615,18 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
ops[2].flags = 0;
ops[2].reserved = NULL;
- ops[3].op = GRPC_OP_RECV_INITIAL_METADATA;
- ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
- ops[3].flags = 0;
- ops[3].reserved = NULL;
-
- ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- ops[4].data.recv_status_on_client.trailing_metadata =
+ ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ ops[3].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
- ops[4].data.recv_status_on_client.status =
+ ops[3].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
- ops[4].data.recv_status_on_client.status_details =
+ ops[3].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
- ops[4].data.recv_status_on_client.status_details_capacity =
+ ops[3].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
- ops[4].flags = 0;
- ops[4].reserved = NULL;
+ ops[3].flags = 0;
+ ops[3].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
@@ -642,7 +637,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_metadata_array *initial_metadata) {
/* TODO: don't use magic number */
- grpc_op ops[3];
+ grpc_op ops[2];
ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
grpcsharp_metadata_array_move(&(ctx->send_initial_metadata),
initial_metadata);
@@ -652,28 +647,36 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
ops[0].flags = 0;
ops[0].reserved = NULL;
- ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
- ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
- ops[1].flags = 0;
- ops[1].reserved = NULL;
-
- ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- ops[2].data.recv_status_on_client.trailing_metadata =
+ ops[1].op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ ops[1].data.recv_status_on_client.trailing_metadata =
&(ctx->recv_status_on_client.trailing_metadata);
- ops[2].data.recv_status_on_client.status =
+ ops[1].data.recv_status_on_client.status =
&(ctx->recv_status_on_client.status);
/* not using preallocation for status_details */
- ops[2].data.recv_status_on_client.status_details =
+ ops[1].data.recv_status_on_client.status_details =
&(ctx->recv_status_on_client.status_details);
- ops[2].data.recv_status_on_client.status_details_capacity =
+ ops[1].data.recv_status_on_client.status_details_capacity =
&(ctx->recv_status_on_client.status_details_capacity);
- ops[2].flags = 0;
- ops[2].reserved = NULL;
+ ops[1].flags = 0;
+ ops[1].reserved = NULL;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
NULL);
}
+GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata(
+ grpc_call *call, grpcsharp_batch_context *ctx) {
+ /* TODO: don't use magic number */
+ grpc_op ops[1];
+ ops[0].op = GRPC_OP_RECV_INITIAL_METADATA;
+ ops[0].data.recv_initial_metadata = &(ctx->recv_initial_metadata);
+ ops[0].flags = 0;
+ ops[0].reserved = NULL;
+
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
+}
+
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
const char *send_buffer, size_t send_buffer_len,