diff options
author | Jan Tattermusch <jtattermusch@google.com> | 2015-07-20 22:34:19 -0700 |
---|---|---|
committer | Jan Tattermusch <jtattermusch@google.com> | 2015-07-20 22:34:19 -0700 |
commit | a0bb06511e139e413f2d7dfde11644f81c29a5c1 (patch) | |
tree | efdd0afc2f9ac01d260f503e6f1f43865ea2f208 | |
parent | 998eb9bcaf8990a9c7ec2709550fb70c72c430dc (diff) |
allow sending trailers from server handler
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 7 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 6 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerCallHandler.cs | 20 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerResponseStream.cs | 4 | ||||
-rw-r--r-- | src/csharp/ext/grpc_csharp_ext.c | 11 |
5 files changed, 28 insertions, 20 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 309067ea9d..f809f4a84c 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -101,14 +101,17 @@ namespace Grpc.Core.Internal /// Only one pending send action is allowed at any given time. /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate<object> completionDelegate) + public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate) { lock (myLock) { Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); CheckSendingAllowed(); - call.StartSendStatusFromServer(status, HandleHalfclosed); + using (var metadataArray = MetadataArraySafeHandle.Create(trailers)) + { + call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray); + } halfcloseRequested = true; readingDone = true; sendCompletionDelegate = completionDelegate; diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 3b246ac01b..19dbb83f24 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -81,7 +81,7 @@ namespace Grpc.Core.Internal [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, - BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage); + BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, @@ -159,11 +159,11 @@ namespace Grpc.Core.Internal grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback) + public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); completionRegistry.RegisterBatchCompletion(ctx, callback); - grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail).CheckOk(); + grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk(); } public void StartReceiveMessage(BatchCompletionDelegate callback) diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index f3d3c629bc..ddd2187b3e 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -72,13 +72,13 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + var context = HandlerUtils.NewContext(newRpc); try { Preconditions.CheckArgument(await requestStream.MoveNext()); var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. Preconditions.CheckArgument(!await requestStream.MoveNext()); - var context = HandlerUtils.NewContext(newRpc); var result = await handler(context, request); status = context.Status; await responseStream.WriteAsync(result); @@ -90,7 +90,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status); + await responseStream.WriteStatusAsync(status, context.ResponseTrailers); } catch (OperationCanceledException) { @@ -126,14 +126,13 @@ namespace Grpc.Core.Internal var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); Status status; + var context = HandlerUtils.NewContext(newRpc); try { Preconditions.CheckArgument(await requestStream.MoveNext()); var request = requestStream.Current; // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated. Preconditions.CheckArgument(!await requestStream.MoveNext()); - - var context = HandlerUtils.NewContext(newRpc); await handler(context, request, responseStream); status = context.Status; } @@ -145,7 +144,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status); + await responseStream.WriteStatusAsync(status, context.ResponseTrailers); } catch (OperationCanceledException) { @@ -179,9 +178,10 @@ namespace Grpc.Core.Internal var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); - var context = HandlerUtils.NewContext(newRpc); + Status status; + var context = HandlerUtils.NewContext(newRpc); try { var result = await handler(context, requestStream); @@ -203,7 +203,7 @@ namespace Grpc.Core.Internal try { - await responseStream.WriteStatusAsync(status); + await responseStream.WriteStatusAsync(status, context.ResponseTrailers); } catch (OperationCanceledException) { @@ -237,9 +237,9 @@ namespace Grpc.Core.Internal var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); - var context = HandlerUtils.NewContext(newRpc); Status status; + var context = HandlerUtils.NewContext(newRpc); try { await handler(context, requestStream, responseStream); @@ -252,7 +252,7 @@ namespace Grpc.Core.Internal } try { - await responseStream.WriteStatusAsync(status); + await responseStream.WriteStatusAsync(status, context.ResponseTrailers); } catch (OperationCanceledException) { @@ -277,7 +277,7 @@ namespace Grpc.Core.Internal var requestStream = new ServerRequestStream<byte[], byte[]>(asyncCall); var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall); - await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method.")); + await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, "No such method."), Metadata.Empty); await finishedTask; } } diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index a2d77dd5b7..756dcee87f 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -56,10 +56,10 @@ namespace Grpc.Core.Internal return taskSource.Task; } - public Task WriteStatusAsync(Status status) + public Task WriteStatusAsync(Status status, Metadata trailers) { var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendStatusFromServer(status, taskSource.CompletionDelegate); + call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate); return taskSource.Task; } } diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 6856d89ff1..bd0a259593 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -630,15 +630,20 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code, - const char *status_details) { + const char *status_details, + grpc_metadata_array *trailing_metadata) { /* TODO: don't use magic number */ grpc_op ops[1]; ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER; ops[0].data.send_status_from_server.status = status_code; ops[0].data.send_status_from_server.status_details = gpr_strdup(status_details); - ops[0].data.send_status_from_server.trailing_metadata = NULL; - ops[0].data.send_status_from_server.trailing_metadata_count = 0; + grpcsharp_metadata_array_move(&(ctx->send_status_from_server.trailing_metadata), + trailing_metadata); + ops[0].data.send_status_from_server.trailing_metadata_count = + ctx->send_status_from_server.trailing_metadata.count; + ops[0].data.send_status_from_server.trailing_metadata = + ctx->send_status_from_server.trailing_metadata.metadata; ops[0].flags = 0; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx); |