aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-05-05 15:53:37 -0700
committerGravatar Jan Tattermusch <jtattermusch@users.noreply.github.com>2016-05-05 15:53:37 -0700
commit2fc526701e7869a80f1f6fb76abdec61f690dab7 (patch)
tree3fbd20618672fdd870b6d1f9c8a9c0943a909460
parent48d833a9d8280216040ef731341502d7d0f157e4 (diff)
parent2901ea55ed17b16107bda30091a8ca0b84f6a926 (diff)
Merge pull request #6434 from jtattermusch/csharp_serverside_unaryresponse_opt
C# improve handling of serverside calls with unary response.
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs12
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/INativeCall.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs18
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs72
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c30
8 files changed, 94 insertions, 52 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index 058371521d..0e204761f6 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -168,7 +168,7 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
- var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata());
+ var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
fakeCall.SendCompletionHandler(true);
fakeCall.SendStatusFromServerHandler(true);
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
index 1bec258ca2..909112a47c 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
@@ -165,7 +165,8 @@ namespace Grpc.Core.Internal.Tests
SendCompletionHandler = callback;
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalPayload, WriteFlags writeFlags)
{
SendStatusFromServerHandler = callback;
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index eafe2ccab8..b1566b44a7 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -139,8 +139,11 @@ namespace Grpc.Core.Internal
/// Sends call result status, indicating we are done with writes.
/// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
- public Task SendStatusFromServerAsync(Status status, Metadata trailers)
+ public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite)
{
+ byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null;
+ var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags);
+
lock (myLock)
{
GrpcPreconditions.CheckState(started);
@@ -149,11 +152,16 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
+ call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent,
+ payload, writeFlags);
}
halfcloseRequested = true;
initialMetadataSent = true;
sendStatusFromServerTcs = new TaskCompletionSource<object>();
+ if (optionalWrite != null)
+ {
+ streamingWritesCounter++;
+ }
return sendStatusFromServerTcs.Task;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 500653ba5d..244b97d4a4 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -135,13 +135,16 @@ namespace Grpc.Core.Internal
}
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalPayload, WriteFlags writeFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
+ var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
+ Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
+ optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
index cbef599139..cd3719cb50 100644
--- a/src/csharp/Grpc.Core/Internal/INativeCall.cs
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -78,7 +78,7 @@ namespace Grpc.Core.Internal
void StartSendCloseFromClient(SendCompletionHandler callback);
- void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, Grpc.Core.WriteFlags writeFlags);
void StartServerSide(ReceivedCloseOnServerHandler callback);
}
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index 9ee0ba3bc0..c277c73ef0 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -421,20 +421,21 @@ namespace Grpc.Core.Internal
public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
@@ -593,7 +594,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@@ -601,7 +602,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
@@ -610,7 +611,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -618,7 +619,8 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 00d82d51e8..85b7a4b01e 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,27 +75,32 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
- var result = await handler(request, context).ConfigureAwait(false);
+ var response = await handler(request, context).ConfigureAwait(false);
status = context.Status;
- await responseStream.WriteAsync(result).ConfigureAwait(false);
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -139,17 +144,21 @@ namespace Grpc.Core.Internal
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -183,33 +192,31 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
- var result = await handler(requestStream, context).ConfigureAwait(false);
+ var response = await handler(requestStream, context).ConfigureAwait(false);
status = context.Status;
- try
- {
- await responseStream.WriteAsync(result).ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- status = Status.DefaultCancelled;
- }
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -251,16 +258,20 @@ namespace Grpc.Core.Internal
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -278,7 +289,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
- await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}
@@ -297,6 +308,11 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
+ public static WriteFlags GetWriteFlags(WriteOptions writeOptions)
+ {
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
where TRequest : class
where TResponse : class
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index aeef8a79e9..5b8ff9b819 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -715,10 +715,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
const char *status_details, grpc_metadata_array *trailing_metadata,
- int32_t send_empty_initial_metadata) {
+ int32_t send_empty_initial_metadata, const char* optional_send_buffer,
+ size_t optional_send_buffer_len, uint32_t write_flags) {
/* TODO: don't use magic number */
- grpc_op ops[2];
- size_t nops = send_empty_initial_metadata ? 2 : 1;
+ grpc_op ops[3];
+ size_t nops = 1;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@@ -731,12 +732,23 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
- ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[1].data.send_initial_metadata.count = 0;
- ops[1].data.send_initial_metadata.metadata = NULL;
- ops[1].flags = 0;
- ops[1].reserved = NULL;
-
+ if (optional_send_buffer) {
+ ops[nops].op = GRPC_OP_SEND_MESSAGE;
+ ctx->send_message = string_to_byte_buffer(optional_send_buffer,
+ optional_send_buffer_len);
+ ops[nops].data.send_message = ctx->send_message;
+ ops[nops].flags = write_flags;
+ ops[nops].reserved = NULL;
+ nops ++;
+ }
+ if (send_empty_initial_metadata) {
+ ops[nops].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[nops].data.send_initial_metadata.count = 0;
+ ops[nops].data.send_initial_metadata.metadata = NULL;
+ ops[nops].flags = 0;
+ ops[nops].reserved = NULL;
+ nops++;
+ }
return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}