aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core/Internal
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core/Internal')
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs116
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs41
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs37
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs23
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs31
9 files changed, 244 insertions, 73 deletions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index bfcb9366a1..0db9d2a515 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -50,7 +50,7 @@ namespace Grpc.Core.Internal
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>();
- Channel channel;
+ readonly CallInvocationDetails<TRequest, TResponse> details;
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
@@ -60,26 +60,19 @@ namespace Grpc.Core.Internal
bool readObserverCompleted; // True if readObserver has already been completed.
- public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
+ public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
+ : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
- }
-
- public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName, Timespec deadline)
- {
- this.channel = channel;
- var call = channel.Handle.CreateCall(channel.CompletionRegistry, cq, methodName, channel.Target, deadline);
- channel.Environment.DebugStats.ActiveClientCalls.Increment();
- InitializeInternal(call);
+ this.details = callDetails;
+ this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
}
// TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
// it is reusing fair amount of code in this class, so we are leaving it here.
- // TODO: for other calls, you need to call Initialize, this methods calls initialize
- // on its own, so there's a usage inconsistency.
/// <summary>
/// Blocking unary request - unary response call.
/// </summary>
- public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers, DateTime deadline)
+ public TResponse UnaryCall(TRequest msg)
{
using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
{
@@ -89,17 +82,19 @@ namespace Grpc.Core.Internal
lock (myLock)
{
- Initialize(channel, cq, methodName, Timespec.FromDateTime(deadline));
+ Preconditions.CheckState(!started);
started = true;
+ Initialize(cq);
+
halfcloseRequested = true;
readingDone = true;
}
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
using (var ctx = BatchContextSafeHandle.Create())
{
- call.StartUnary(payload, ctx, metadataArray);
+ call.StartUnary(ctx, payload, metadataArray, GetWriteFlagsForCall());
var ev = cq.Pluck(ctx.Handle);
bool success = (ev.success != 0);
@@ -129,22 +124,24 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - unary response call.
/// </summary>
- public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers, DateTime deadline)
+ public Task<TResponse> UnaryCallAsync(TRequest msg)
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
+
+ Initialize(details.Channel.Environment.CompletionQueue);
+
halfcloseRequested = true;
readingDone = true;
byte[] payload = UnsafeSerialize(msg);
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartUnary(payload, HandleUnaryResponse, metadataArray);
+ call.StartUnary(HandleUnaryResponse, payload, metadataArray, GetWriteFlagsForCall());
}
return unaryResponseTcs.Task;
}
@@ -154,17 +151,19 @@ namespace Grpc.Core.Internal
/// Starts a streamed request - unary response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public Task<TResponse> ClientStreamingCallAsync(Metadata headers, DateTime deadline)
+ public Task<TResponse> ClientStreamingCallAsync()
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
+
+ Initialize(details.Channel.Environment.CompletionQueue);
+
readingDone = true;
unaryResponseTcs = new TaskCompletionSource<TResponse>();
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartClientStreaming(HandleUnaryResponse, metadataArray);
}
@@ -176,21 +175,23 @@ namespace Grpc.Core.Internal
/// <summary>
/// Starts a unary request - streamed response call.
/// </summary>
- public void StartServerStreamingCall(TRequest msg, Metadata headers, DateTime deadline)
+ public void StartServerStreamingCall(TRequest msg)
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
+
+ Initialize(details.Channel.Environment.CompletionQueue);
+
halfcloseRequested = true;
halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
byte[] payload = UnsafeSerialize(msg);
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
- call.StartServerStreaming(payload, HandleFinished, metadataArray);
+ call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall());
}
}
}
@@ -199,15 +200,16 @@ namespace Grpc.Core.Internal
/// Starts a streaming request - streaming response call.
/// Use StartSendMessage and StartSendCloseFromClient to stream requests.
/// </summary>
- public void StartDuplexStreamingCall(Metadata headers, DateTime deadline)
+ public void StartDuplexStreamingCall()
{
lock (myLock)
{
- Preconditions.CheckNotNull(call);
-
+ Preconditions.CheckState(!started);
started = true;
- using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ Initialize(details.Channel.Environment.CompletionQueue);
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
call.StartDuplexStreaming(HandleFinished, metadataArray);
}
@@ -218,9 +220,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming request. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -277,6 +279,14 @@ namespace Grpc.Core.Internal
}
}
+ public CallInvocationDetails<TRequest, TResponse> Details
+ {
+ get
+ {
+ return this.details;
+ }
+ }
+
/// <summary>
/// On client-side, we only fire readCompletionDelegate once all messages have been read
/// and status has been received.
@@ -309,7 +319,39 @@ namespace Grpc.Core.Internal
protected override void OnReleaseResources()
{
- channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ }
+
+ private void Initialize(CompletionQueueSafeHandle cq)
+ {
+ var propagationToken = details.Options.PropagationToken;
+ var parentCall = propagationToken != null ? propagationToken.ParentCall : CallSafeHandle.NullInstance;
+
+ var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
+ parentCall, ContextPropagationToken.DefaultMask, cq,
+ details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline));
+ details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+ InitializeInternal(call);
+ RegisterCancellationCallback();
+ }
+
+ // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called.
+ private void RegisterCancellationCallback()
+ {
+ var token = details.Options.CancellationToken;
+ if (token.CanBeCanceled)
+ {
+ token.Register(() => this.Cancel());
+ }
+ }
+
+ /// <summary>
+ /// Gets WriteFlags set in callDetails.Options.WriteOptions
+ /// </summary>
+ private WriteFlags GetWriteFlagsForCall()
+ {
+ var writeOptions = details.Options.WriteOptions;
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 38f2a5baeb..9fa0baca87 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -71,6 +71,9 @@ namespace Grpc.Core.Internal
protected bool halfclosed;
protected bool finished; // True if close has been received from the peer.
+ protected bool initialMetadataSent;
+ protected long streamingWritesCounter;
+
public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = Preconditions.CheckNotNull(serializer);
@@ -123,7 +126,7 @@ namespace Grpc.Core.Internal
/// Initiates sending a message. Only one send operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate<object> completionDelegate)
+ protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
byte[] payload = UnsafeSerialize(msg);
@@ -132,8 +135,11 @@ namespace Grpc.Core.Internal
Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
CheckSendingAllowed();
- call.StartSendMessage(payload, HandleSendFinished);
+ call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
+
sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ streamingWritesCounter++;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 513902ee36..3710a65d6b 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -83,9 +83,9 @@ namespace Grpc.Core.Internal
/// Sends a streaming response. Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, AsyncCompletionDelegate<object> completionDelegate)
+ public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
{
- StartSendMessageInternal(msg, completionDelegate);
+ StartSendMessageInternal(msg, writeFlags, completionDelegate);
}
/// <summary>
@@ -98,6 +98,35 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Initiates sending a initial metadata.
+ /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
+ /// to make things simpler.
+ /// completionDelegate is invoked upon completion.
+ /// </summary>
+ public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+ {
+ lock (myLock)
+ {
+ Preconditions.CheckNotNull(headers, "metadata");
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ Preconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
+ Preconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
+ CheckSendingAllowed();
+
+ Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+
+ using (var metadataArray = MetadataArraySafeHandle.Create(headers))
+ {
+ call.StartSendInitialMetadata(HandleSendFinished, metadataArray);
+ }
+
+ this.initialMetadataSent = true;
+ sendCompletionDelegate = completionDelegate;
+ }
+ }
+
+ /// <summary>
/// Sends call result status, also indicating server is done with streaming responses.
/// Only one pending send action is allowed at any given time.
/// completionDelegate is called when the operation finishes.
@@ -111,7 +140,7 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(status, HandleHalfclosed, metadataArray);
+ call.StartSendStatusFromServer(HandleHalfclosed, status, metadataArray, !initialMetadataSent);
}
halfcloseRequested = true;
readingDone = true;
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 714749b171..3cb01e29bd 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -42,6 +42,8 @@ namespace Grpc.Core.Internal
/// </summary>
internal class CallSafeHandle : SafeHandleZeroIsInvalid
{
+ public static readonly CallSafeHandle NullInstance = new CallSafeHandle();
+
const uint GRPC_WRITE_BUFFER_HINT = 1;
CompletionRegistry completionRegistry;
@@ -53,7 +55,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@@ -62,7 +64,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
- MetadataArraySafeHandle metadataArray);
+ MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
@@ -70,7 +72,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len);
+ BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -78,7 +80,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
@@ -89,6 +91,10 @@ namespace Grpc.Core.Internal
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
+ static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
+ BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
+
+ [DllImport("grpc_csharp_ext.dll")]
static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
@@ -103,17 +109,17 @@ namespace Grpc.Core.Internal
this.completionRegistry = completionRegistry;
}
- public void StartUnary(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
- public void StartUnary(byte[] payload, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray)
+ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
- grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray)
+ grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@@ -124,11 +130,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartServerStreaming(byte[] payload, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray).CheckOk();
+ grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
@@ -138,11 +144,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
- public void StartSendMessage(byte[] payload, BatchCompletionDelegate callback)
+ public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length)).CheckOk();
+ grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
public void StartSendCloseFromClient(BatchCompletionDelegate callback)
@@ -152,11 +158,11 @@ namespace Grpc.Core.Internal
grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
- public void StartSendStatusFromServer(Status status, BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
{
var ctx = BatchContextSafeHandle.Create();
completionRegistry.RegisterBatchCompletion(ctx, callback);
- grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray).CheckOk();
+ grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
}
public void StartReceiveMessage(BatchCompletionDelegate callback)
@@ -173,6 +179,13 @@ namespace Grpc.Core.Internal
grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
+ public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
+ }
+
public void Cancel()
{
grpcsharp_call_cancel(this).CheckOk();
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 20815efbd3..7f03bf4ea5 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -47,7 +47,17 @@ namespace Grpc.Core.Internal
static extern ChannelSafeHandle grpcsharp_secure_channel_create(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs);
[DllImport("grpc_csharp_ext.dll")]
- static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
+ static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern ChannelState grpcsharp_channel_check_connectivity_state(ChannelSafeHandle channel, int tryToConnect);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern void grpcsharp_channel_watch_connectivity_state(ChannelSafeHandle channel, ChannelState lastObservedState,
+ Timespec deadline, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ static extern CStringSafeHandle grpcsharp_channel_get_target(ChannelSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_channel_destroy(IntPtr channel);
@@ -66,13 +76,34 @@ namespace Grpc.Core.Internal
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
- public CallSafeHandle CreateCall(CompletionRegistry registry, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
+ public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline)
{
- var result = grpcsharp_channel_create_call(this, cq, method, host, deadline);
+ var result = grpcsharp_channel_create_call(this, parentCall, propagationMask, cq, method, host, deadline);
result.SetCompletionRegistry(registry);
return result;
}
+ public ChannelState CheckConnectivityState(bool tryToConnect)
+ {
+ return grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
+ }
+
+ public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
+ CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
+ {
+ var ctx = BatchContextSafeHandle.Create();
+ completionRegistry.RegisterBatchCompletion(ctx, callback);
+ grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
+ }
+
+ public string GetTarget()
+ {
+ using (var cstring = grpcsharp_channel_get_target(this))
+ {
+ return cstring.GetValue();
+ }
+ }
+
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 58f493463b..013f00ff6f 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -40,16 +40,18 @@ namespace Grpc.Core.Internal
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
{
readonly AsyncCall<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ClientRequestStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
+ this.writeOptions = call.Details.Options.WriteOptions;
}
public Task WriteAsync(TRequest message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -59,5 +61,24 @@ namespace Grpc.Core.Internal
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return this.writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 19f0e3c57f..688f9f6fec 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,7 +75,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -131,7 +131,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
Preconditions.CheckArgument(await requestStream.MoveNext());
@@ -187,7 +187,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
var result = await handler(requestStream, context);
@@ -247,7 +247,7 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
- var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, asyncCall.CancellationToken);
+ var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
await handler(requestStream, responseStream, context);
@@ -304,13 +304,14 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
- public static ServerCallContext NewContext(ServerRpcNew newRpc, string peer, CancellationToken cancellationToken)
+ public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
+ where TRequest : class
+ where TResponse : class
{
DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
- return new ServerCallContext(
- newRpc.Method, newRpc.Host, peer, realtimeDeadline,
- newRpc.RequestMetadata, cancellationToken);
+ return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
+ newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs
index 59238a452c..37a4f5256b 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCredentialsSafeHandle.cs
@@ -42,7 +42,7 @@ namespace Grpc.Core.Internal
internal class ServerCredentialsSafeHandle : SafeHandleZeroIsInvalid
{
[DllImport("grpc_csharp_ext.dll", CharSet = CharSet.Ansi)]
- static extern ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs);
+ static extern ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
[DllImport("grpc_csharp_ext.dll")]
static extern void grpcsharp_server_credentials_release(IntPtr credentials);
@@ -51,12 +51,13 @@ namespace Grpc.Core.Internal
{
}
- public static ServerCredentialsSafeHandle CreateSslCredentials(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray)
+ public static ServerCredentialsSafeHandle CreateSslCredentials(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, bool forceClientAuth)
{
Preconditions.CheckArgument(keyCertPairCertChainArray.Length == keyCertPairPrivateKeyArray.Length);
return grpcsharp_ssl_server_credentials_create(pemRootCerts,
keyCertPairCertChainArray, keyCertPairPrivateKeyArray,
- new UIntPtr((ulong)keyCertPairCertChainArray.Length));
+ new UIntPtr((ulong)keyCertPairCertChainArray.Length),
+ forceClientAuth);
}
protected override bool ReleaseHandle()
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 756dcee87f..03e39efc02 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -38,11 +38,12 @@ namespace Grpc.Core.Internal
/// <summary>
/// Writes responses asynchronously to an underlying AsyncCallServer object.
/// </summary>
- internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>
+ internal class ServerResponseStream<TRequest, TResponse> : IServerStreamWriter<TResponse>, IHasWriteOptions
where TRequest : class
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
+ WriteOptions writeOptions;
public ServerResponseStream(AsyncCallServer<TRequest, TResponse> call)
{
@@ -52,7 +53,7 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, taskSource.CompletionDelegate);
+ call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
return taskSource.Task;
}
@@ -62,5 +63,31 @@ namespace Grpc.Core.Internal
call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
return taskSource.Task;
}
+
+ public Task WriteResponseHeadersAsync(Metadata responseHeaders)
+ {
+ var taskSource = new AsyncCompletionTaskSource<object>();
+ call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
+ return taskSource.Task;
+ }
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return writeOptions;
+ }
+
+ set
+ {
+ writeOptions = value;
+ }
+ }
+
+ private WriteFlags GetWriteFlags()
+ {
+ var options = writeOptions;
+ return options != null ? options.Flags : default(WriteFlags);
+ }
}
}