aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@google.com>2015-07-20 22:34:19 -0700
committerGravatar Jan Tattermusch <jtattermusch@google.com>2015-07-20 22:34:19 -0700
commita0bb06511e139e413f2d7dfde11644f81c29a5c1 (patch)
treeefdd0afc2f9ac01d260f503e6f1f43865ea2f208
parent998eb9bcaf8990a9c7ec2709550fb70c72c430dc (diff)
allow sending trailers from server handler
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs4
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c11
5 files changed, 28 insertions, 20 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 309067ea9d..f809f4a84c 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -101,14 +101,17 @@ namespace Grpc.Core.Internal
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
{
lock (myLock)
{
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendStatusFromServer(status, HandleHalfclosed);
+ using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
+ {
+ call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
+ }
halfcloseRequested = true;
readingDone = true;
sendCompletionDelegate = completionDelegate;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 3b246ac01b..19dbb83f24 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -81,7 +81,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@@ -159,11 +159,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback)
+ public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk();
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index f3d3c629bc..ddd2187b3e 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -72,13 +72,13 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ var context = HandlerUtils.NewContext(newRpc);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(!await requestStream.MoveNext());
- var context = HandlerUtils.NewContext(newRpc);
var result = await handler(context, request);
status = context.Status;
await responseStream.WriteAsync(result);
@@ -90,7 +90,7 @@ namespace Grpc.Core.Internal
}
try
{
- await responseStream.WriteStatusAsync(status);
+ await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
}
catch (OperationCanceledException)
{
@@ -126,14 +126,13 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ var context = HandlerUtils.NewContext(newRpc);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
Preconditions.CheckArgument(!await requestStream.MoveNext());
-
- var context = HandlerUtils.NewContext(newRpc);
await handler(context, request, responseStream);
status = context.Status;
}
@@ -145,7 +144,7 @@ namespace Grpc.Core.Internal
try
{
- await responseStream.WriteStatusAsync(status);
+ await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
}
catch (OperationCanceledException)
{
@@ -179,9 +178,10 @@ namespace Grpc.Core.Internal
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
- var context = HandlerUtils.NewContext(newRpc);
+
Status status;
+ var context = HandlerUtils.NewContext(newRpc);
try
{
var result = await handler(context, requestStream);
@@ -203,7 +203,7 @@ namespace Grpc.Core.Internal
try
{
- await responseStream.WriteStatusAsync(status);
+ await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
}
catch (OperationCanceledException)
{
@@ -237,9 +237,9 @@ namespace Grpc.Core.Internal
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
- var context = HandlerUtils.NewContext(newRpc);
Status status;
+ var context = HandlerUtils.NewContext(newRpc);
try
{
await handler(context, requestStream, responseStream);
@@ -252,7 +252,7 @@ namespace Grpc.Core.Internal
}
try
{
- await responseStream.WriteStatusAsync(status);
+ await responseStream.WriteStatusAsync(status, context.ResponseTrailers);
}
catch (OperationCanceledException)
{
@@ -277,7 +277,7 @@ namespace Grpc.Core.Internal
var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall);
var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
- await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."));
+ await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."), Metadata.Empty);
await finishedTask;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index a2d77dd5b7..756dcee87f 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -56,10 +56,10 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
- public Task WriteStatusAsync(Status status)
+ public Task WriteStatusAsync(Status status, Metadata trailers)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendStatusFromServer(status, taskSource.CompletionDelegate);
+ call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 6856d89ff1..bd0a259593 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -630,15 +630,20 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_status_from_server(grpc_call *call,
grpcsharp_batch_context *ctx,
grpc_status_code status_code,
- const char *status_details) {
+ const char *status_details,
+ grpc_metadata_array *trailing_metadata) {
/* TODO: don't use magic number */
grpc_op ops[1];
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
gpr_strdup(status_details);
- ops[0].data.send_status_from_server.trailing_metadata = NULL;
- ops[0].data.send_status_from_server.trailing_metadata_count = 0;
+ grpcsharp_metadata_array_move(&(ctx->send_status_from_server.trailing_metadata),
+ trailing_metadata);
+ ops[0].data.send_status_from_server.trailing_metadata_count =
+ ctx->send_status_from_server.trailing_metadata.count;
+ ops[0].data.send_status_from_server.trailing_metadata =
+ ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);