aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs1
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs30
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs1
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs8
-rw-r--r--src/csharp/Grpc.Core/ServerCallContext.cs17
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c25
9 files changed, 88 insertions, 22 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index c26b0773ba..c8c2449ee6 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -64,6 +64,7 @@ namespace Grpc.Core.Internal
: base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails;
+ this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 0c7694e9a5..9fa0baca87 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -71,6 +71,9 @@ namespace Grpc.Core.Internal
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
+ protected bool initialMetadataSent;
+ protected long streamingWritesCounter;
+
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
@@ -132,8 +135,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(HandleSendFinished, payload, writeFlags);
+ call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
+
sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ streamingWritesCounter++;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 8d7bdf65aa..9eac7f7b61 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -98,6 +98,34 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Initiates sending a initial metadata.
+ /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
+ /// to make things simpler.
+ /// completionDelegate is invoked upon completion.
+ /// </summary>
+ public void StartSendInitialMetadata(Metadata headers, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
+ Preconditions.CheckState(streamingWritesCounter > 0, "Response headers can only be sent before the first write starts.");
+ CheckSendingAllowed();
+
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ {
+ call.StartSendInitialMetadata(HandleSendFinished, metadataArray, writeFlags);
+ }
+
+ this.initialMetadataSent = true;
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
@@ -111,7 +139,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags);
+ call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, writeFlags, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index be1426feb4..02502a6f01 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -70,7 +70,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -78,7 +78,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@@ -142,11 +142,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, writeFlags).CheckOk();
}
- public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags)
+ public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags).CheckOk();
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback, WriteFlags writeFlags)
@@ -156,11 +156,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx, writeFlags).CheckOk();
}
- public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags).CheckOk();
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@@ -177,7 +177,7 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
- public void SendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 4a14694977..dd7f4256c4 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -68,6 +68,7 @@ namespace Grpc.Core.Internal
{
return this.writeOptions;
}
+
set
{
writeOptions = value;
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 34db03cc38..74af19dc01 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -304,13 +304,15 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
- public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, IHasWriteOptions writeOptionsHolder, CancellationToken cancellationToken)
+ public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
+ where TRequest : class
+ where TResponse : class
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
return new ServerCallContext(
newRpc.Method, newRpc.Host, peer, realtimeDeadline,
- newRpc.RequestMetadata, cancellationToken, writeOptionsHolder);
+ newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 1d79241f77..5dcd5a7220 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -64,12 +64,20 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ var taskSource = new AsyncCompletionTaskSource<object>();
+ call.StartSendInitialMetadata(responseHeaders, GetWriteFlags(), taskSource.CompletionDelegate);
+ return taskSource.Task;
+ }
+
public WriteOptions WriteOptions
{
get
{
return writeOptions;
}
+
set
{
writeOptions = value;
diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs
index 5657eb8513..7849df9bb4 100644
--- a/src/csharp/Grpc.Core/ServerCallContext.cs
+++ b/src/csharp/Grpc.Core/ServerCallContext.cs
@@ -43,10 +43,8 @@ namespace Grpc.Core
/// <summary>
/// Context for a server-side call.
/// </summary>
- public sealed class ServerCallContext
+ public class ServerCallContext
{
- // TODO(jtattermusch): expose method to send initial metadata back to client
-
private readonly string method;
private readonly string host;
private readonly string peer;
@@ -56,9 +54,11 @@ namespace Grpc.Core
private readonly Metadata responseTrailers = new Metadata();
private Status status = Status.DefaultSuccess;
+ private Func<Metadata, Task> writeHeadersFunc;
private IHasWriteOptions writeOptionsHolder;
- public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken, IHasWriteOptions writeOptionsHolder)
+ public ServerCallContext(string method, string host, string peer, DateTime deadline, Metadata requestHeaders, CancellationToken cancellationToken,
+ Func<Metadata, Task> writeHeadersFunc, IHasWriteOptions writeOptionsHolder)
{
this.method = method;
this.host = host;
@@ -66,8 +66,14 @@ namespace Grpc.Core
this.deadline = deadline;
this.requestHeaders = requestHeaders;
this.cancellationToken = cancellationToken;
+ this.writeHeadersFunc = writeHeadersFunc;
this.writeOptionsHolder = writeOptionsHolder;
}
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ return writeHeadersFunc(responseHeaders);
+ }
/// <summary>Name of method called in this RPC.</summary>
public string Method
@@ -114,7 +120,7 @@ namespace Grpc.Core
}
}
- ///<summary>Cancellation token signals when call is cancelled.</summary>
+ /// <summary>Cancellation token signals when call is cancelled.</summary>
public CancellationToken CancellationToken
{
get
@@ -157,6 +163,7 @@ namespace Grpc.Core
{
return writeOptionsHolder.WriteOptions;
}
+
set
{
writeOptionsHolder.WriteOptions = value;
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 165459371b..cb138064e1 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -651,15 +651,22 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
- const char *send_buffer, size_t send_buffer_len, gpr_uint32 write_flags) {
+ const char *send_buffer, size_t send_buffer_len,
+ gpr_uint32 write_flags,
+ gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
- grpc_op ops[1];
+ grpc_op ops[2];
+ size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_MESSAGE;
ctx->send_message = string_to_byte_buffer(send_buffer, send_buffer_len);
ops[0].data.send_message = ctx->send_message;
ops[0].flags = write_flags;
+ ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[1].data.send_initial_metadata.count = 0;
+ ops[1].data.send_initial_metadata.metadata = NULL;
+ ops[1].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, nops, ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -675,9 +682,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
- const char *status_details, grpc_metadata_array *trailing_metadata, gpr_uint32 write_flags) {
+ const char *status_details, grpc_metadata_array *trailing_metadata,
+ gpr_uint32 write_flags, gpr_int32 send_empty_initial_metadata) {
/* TODO: don't use magic number */
- grpc_op ops[1];
+ grpc_op ops[2];
+ size_t nops = send_empty_initial_metadata ? 2 : 1;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@@ -689,8 +698,12 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ops[0].data.send_status_from_server.trailing_metadata =
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = write_flags;
+ ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[1].data.send_initial_metadata.count = 0;
+ ops[1].data.send_initial_metadata.metadata = NULL;
+ ops[1].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, nops, ctx);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE