aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp/Grpc.Core
diff options
context:
space:
mode:
Diffstat (limited to 'src/csharp/Grpc.Core')
-rw-r--r--src/csharp/Grpc.Core/AsyncClientStreamingCall.cs4
-rw-r--r--src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs4
-rw-r--r--src/csharp/Grpc.Core/AsyncServerStreamingCall.cs4
-rw-r--r--src/csharp/Grpc.Core/AsyncUnaryCall.cs4
-rw-r--r--src/csharp/Grpc.Core/CallOptions.cs8
-rw-r--r--src/csharp/Grpc.Core/Channel.cs68
-rw-r--r--src/csharp/Grpc.Core/ChannelState.cs2
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj6
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs170
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs102
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs81
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs36
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs103
-rw-r--r--src/csharp/Grpc.Core/Internal/CallError.cs (renamed from src/csharp/Grpc.Core/Internal/Enums.cs)39
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs26
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs9
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientSideStatus.cs70
-rw-r--r--src/csharp/Grpc.Core/Internal/ClockType.cs53
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs16
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs98
-rw-r--r--src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs91
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs34
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRpcNew.cs (renamed from src/csharp/Grpc.Core/Internal/AsyncCompletion.cs)75
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs27
-rw-r--r--src/csharp/Grpc.Core/Internal/Timespec.cs37
-rw-r--r--src/csharp/Grpc.Core/Properties/AssemblyInfo.cs6
-rw-r--r--src/csharp/Grpc.Core/Server.cs130
-rw-r--r--src/csharp/Grpc.Core/ServerServiceDefinition.cs10
-rw-r--r--src/csharp/Grpc.Core/WriteOptions.cs4
33 files changed, 861 insertions, 494 deletions
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index 5646fed3d9..02b08d2a10 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -127,6 +127,10 @@ namespace Grpc.Core
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
+ /// <remarks>
+ /// Normally, there is no need for you to dispose the call unless you want to utilize the
+ /// "Cancel" semantics of invoking <c>Dispose</c>.
+ /// </remarks>
public void Dispose()
{
disposeAction.Invoke();
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index e75108c7e5..68fd6d0b05 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -117,6 +117,10 @@ namespace Grpc.Core
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
+ /// <remarks>
+ /// Normally, there is no need for you to dispose the call unless you want to utilize the
+ /// "Cancel" semantics of invoking <c>Dispose</c>.
+ /// </remarks>
public void Dispose()
{
disposeAction.Invoke();
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index f953091984..5777c72615 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -103,6 +103,10 @@ namespace Grpc.Core
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
+ /// <remarks>
+ /// Normally, there is no need for you to dispose the call unless you want to utilize the
+ /// "Cancel" semantics of invoking <c>Dispose</c>.
+ /// </remarks>
public void Dispose()
{
disposeAction.Invoke();
diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
index 97df8f5e91..d180c27922 100644
--- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs
+++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
@@ -112,6 +112,10 @@ namespace Grpc.Core
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
+ /// <remarks>
+ /// Normally, there is no need for you to dispose the call unless you want to utilize the
+ /// "Cancel" semantics of invoking <c>Dispose</c>.
+ /// </remarks>
public void Dispose()
{
disposeAction.Invoke();
diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs
index 9ca88849ee..35548cfc96 100644
--- a/src/csharp/Grpc.Core/CallOptions.cs
+++ b/src/csharp/Grpc.Core/CallOptions.cs
@@ -88,7 +88,13 @@ namespace Grpc.Core
}
/// <summary>
- /// Token that can be used for cancelling the call.
+ /// Token that can be used for cancelling the call on the client side.
+ /// Cancelling the token will request cancellation
+ /// of the remote call. Best effort will be made to deliver the cancellation
+ /// notification to the server and interaction of the call with the server side
+ /// will be terminated. Unless the call finishes before the cancellation could
+ /// happen (there is an inherent race),
+ /// the call will finish with <c>StatusCode.Cancelled</c> status.
/// </summary>
public CancellationToken CancellationToken
{
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 93a6e6a3d9..4f29c35b32 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -31,7 +31,6 @@
using System;
using System.Collections.Generic;
-using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -56,6 +55,7 @@ namespace Grpc.Core
readonly string target;
readonly GrpcEnvironment environment;
+ readonly CompletionQueueSafeHandle completionQueue;
readonly ChannelSafeHandle handle;
readonly Dictionary<string, ChannelOption> options;
@@ -67,14 +67,26 @@ namespace Grpc.Core
/// </summary>
/// <param name="target">Target of the channel.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
+ public Channel(string target, ChannelCredentials credentials) :
+ this(target, credentials, null)
+ {
+ }
+
+ /// <summary>
+ /// Creates a channel that connects to a specific host.
+ /// Port will default to 80 for an unsecure channel and to 443 for a secure channel.
+ /// </summary>
+ /// <param name="target">Target of the channel.</param>
+ /// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
- public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options = null)
+ public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options)
{
this.target = GrpcPreconditions.CheckNotNull(target, "target");
this.options = CreateOptionsDictionary(options);
EnsureUserAgentChannelOption(this.options);
this.environment = GrpcEnvironment.AddRef();
+ this.completionQueue = this.environment.PickCompletionQueue();
using (var nativeCredentials = credentials.ToNativeCredentials())
using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values))
{
@@ -87,6 +99,18 @@ namespace Grpc.Core
this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs);
}
}
+ GrpcEnvironment.RegisterChannel(this);
+ }
+
+ /// <summary>
+ /// Creates a channel that connects to a specific host and port.
+ /// </summary>
+ /// <param name="host">The name or IP address of the host.</param>
+ /// <param name="port">The port.</param>
+ /// <param name="credentials">Credentials to secure the channel.</param>
+ public Channel(string host, int port, ChannelCredentials credentials) :
+ this(host, port, credentials, null)
+ {
}
/// <summary>
@@ -96,14 +120,14 @@ namespace Grpc.Core
/// <param name="port">The port.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
- public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options = null) :
+ public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options) :
this(string.Format("{0}:{1}", host, port), credentials, options)
{
}
/// <summary>
/// Gets current connectivity state of this channel.
- /// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned.
+ /// After channel is has been shutdown, <c>ChannelState.Shutdown</c> will be returned.
/// </summary>
public ChannelState State
{
@@ -120,8 +144,8 @@ namespace Grpc.Core
/// </summary>
public Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
{
- GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.FatalFailure,
- "FatalFailure is a terminal state. No further state changes can occur.");
+ GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown,
+ "Shutdown is a terminal state. No further state changes can occur.");
var tcs = new TaskCompletionSource<object>();
var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture;
var handler = new BatchCompletionDelegate((success, ctx) =>
@@ -135,7 +159,7 @@ namespace Grpc.Core
tcs.SetCanceled();
}
});
- handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler);
+ handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler);
return tcs.Task;
}
@@ -171,7 +195,7 @@ namespace Grpc.Core
/// <summary>
/// Allows explicitly requesting channel to connect without starting an RPC.
/// Returned task completes once state Ready was seen. If the deadline is reached,
- /// or channel enters the FatalFailure state, the task is cancelled.
+ /// or channel enters the Shutdown state, the task is cancelled.
/// There is no need to call this explicitly unless your use case requires that.
/// Starting an RPC on a new channel will request connection implicitly.
/// </summary>
@@ -181,9 +205,9 @@ namespace Grpc.Core
var currentState = GetConnectivityState(true);
while (currentState != ChannelState.Ready)
{
- if (currentState == ChannelState.FatalFailure)
+ if (currentState == ChannelState.Shutdown)
{
- throw new OperationCanceledException("Channel has reached FatalFailure state.");
+ throw new OperationCanceledException("Channel has reached Shutdown state.");
}
await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
currentState = GetConnectivityState(false);
@@ -191,9 +215,16 @@ namespace Grpc.Core
}
/// <summary>
- /// Waits until there are no more active calls for this channel and then cleans up
- /// resources used by this channel.
+ /// Shuts down the channel cleanly. It is strongly recommended to shutdown
+ /// all previously created channels before exiting from the process.
/// </summary>
+ /// <remarks>
+ /// This method doesn't wait for all calls on this channel to finish (nor does
+ /// it explicitly cancel all outstanding calls). It is user's responsibility to make sure
+ /// all the calls on this channel have finished (successfully or with an error)
+ /// before shutting down the channel to ensure channel shutdown won't impact
+ /// the outcome of those remote calls.
+ /// </remarks>
public async Task ShutdownAsync()
{
lock (myLock)
@@ -201,6 +232,7 @@ namespace Grpc.Core
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
+ GrpcEnvironment.UnregisterChannel(this);
shutdownTokenSource.Cancel();
@@ -212,7 +244,7 @@ namespace Grpc.Core
handle.Dispose();
- await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
+ await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
}
internal ChannelSafeHandle Handle
@@ -231,6 +263,14 @@ namespace Grpc.Core
}
}
+ internal CompletionQueueSafeHandle CompletionQueue
+ {
+ get
+ {
+ return this.completionQueue;
+ }
+ }
+
internal void AddCallReference(object call)
{
activeCallCounter.Increment();
@@ -255,7 +295,7 @@ namespace Grpc.Core
}
catch (ObjectDisposedException)
{
- return ChannelState.FatalFailure;
+ return ChannelState.Shutdown;
}
}
diff --git a/src/csharp/Grpc.Core/ChannelState.cs b/src/csharp/Grpc.Core/ChannelState.cs
index d293b98f75..a6c3b2a488 100644
--- a/src/csharp/Grpc.Core/ChannelState.cs
+++ b/src/csharp/Grpc.Core/ChannelState.cs
@@ -64,6 +64,6 @@ namespace Grpc.Core
/// <summary>
/// Channel has seen a failure that it cannot recover from
/// </summary>
- FatalFailure
+ Shutdown
}
}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index 95077a6ca5..a8b7b5f00d 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -74,7 +74,6 @@
<Compile Include="Internal\CallSafeHandle.cs" />
<Compile Include="Internal\ChannelSafeHandle.cs" />
<Compile Include="Internal\CompletionQueueSafeHandle.cs" />
- <Compile Include="Internal\Enums.cs" />
<Compile Include="Internal\SafeHandleZeroIsInvalid.cs" />
<Compile Include="Internal\Timespec.cs" />
<Compile Include="Internal\GrpcThreadPool.cs" />
@@ -87,7 +86,6 @@
<Compile Include="Utils\BenchmarkUtil.cs" />
<Compile Include="ChannelCredentials.cs" />
<Compile Include="Internal\ChannelArgsSafeHandle.cs" />
- <Compile Include="Internal\AsyncCompletion.cs" />
<Compile Include="Internal\AsyncCallBase.cs" />
<Compile Include="Internal\AsyncCallServer.cs" />
<Compile Include="Internal\AsyncCall.cs" />
@@ -134,6 +132,10 @@
<Compile Include="DefaultCallInvoker.cs" />
<Compile Include="Internal\UnimplementedCallInvoker.cs" />
<Compile Include="Internal\InterceptingCallInvoker.cs" />
+ <Compile Include="Internal\ServerRpcNew.cs" />
+ <Compile Include="Internal\ClientSideStatus.cs" />
+ <Compile Include="Internal\ClockType.cs" />
+ <Compile Include="Internal\CallError.cs" />
</ItemGroup>
<ItemGroup>
<None Include="Grpc.Core.nuspec" />
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index bee0ef1d62..0359d9092a 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -32,6 +32,8 @@
#endregion
using System;
+using System.Collections.Generic;
+using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -51,12 +53,17 @@ namespace Grpc.Core
static GrpcEnvironment instance;
static int refCount;
static int? customThreadPoolSize;
+ static int? customCompletionQueueCount;
+ static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
+ static readonly HashSet<Server> registeredServers = new HashSet<Server>();
static ILogger logger = new ConsoleLogger();
+ readonly object myLock = new object();
readonly GrpcThreadPool threadPool;
- readonly CompletionRegistry completionRegistry;
readonly DebugStats debugStats = new DebugStats();
+ readonly AtomicCounter cqPickerCounter = new AtomicCounter();
+
bool isClosed;
/// <summary>
@@ -65,6 +72,8 @@ namespace Grpc.Core
/// </summary>
internal static GrpcEnvironment AddRef()
{
+ ShutdownHooks.Register();
+
lock (staticLock)
{
refCount++;
@@ -77,21 +86,26 @@ namespace Grpc.Core
}
/// <summary>
- /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
- /// (and blocks until the environment has been fully shutdown).
+ /// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero.
/// </summary>
- internal static void Release()
+ internal static async Task ReleaseAsync()
{
+ GrpcEnvironment instanceToShutdown = null;
lock (staticLock)
{
GrpcPreconditions.CheckState(refCount > 0);
refCount--;
if (refCount == 0)
{
- instance.Close();
+ instanceToShutdown = instance;
instance = null;
}
}
+
+ if (instanceToShutdown != null)
+ {
+ await instanceToShutdown.ShutdownAsync();
+ }
}
internal static int GetRefCount()
@@ -102,6 +116,68 @@ namespace Grpc.Core
}
}
+ internal static void RegisterChannel(Channel channel)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(channel);
+ registeredChannels.Add(channel);
+ }
+ }
+
+ internal static void UnregisterChannel(Channel channel)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(channel);
+ GrpcPreconditions.CheckArgument(registeredChannels.Remove(channel), "Channel not found in the registered channels set.");
+ }
+ }
+
+ internal static void RegisterServer(Server server)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(server);
+ registeredServers.Add(server);
+ }
+ }
+
+ internal static void UnregisterServer(Server server)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(server);
+ GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set.");
+ }
+ }
+
+ /// <summary>
+ /// Requests shutdown of all channels created by the current process.
+ /// </summary>
+ public static Task ShutdownChannelsAsync()
+ {
+ HashSet<Channel> snapshot = null;
+ lock (staticLock)
+ {
+ snapshot = new HashSet<Channel>(registeredChannels);
+ }
+ return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync()));
+ }
+
+ /// <summary>
+ /// Requests immediate shutdown of all servers created by the current process.
+ /// </summary>
+ public static Task KillServersAsync()
+ {
+ HashSet<Server> snapshot = null;
+ lock (staticLock)
+ {
+ snapshot = new HashSet<Server>(registeredServers);
+ }
+ return Task.WhenAll(snapshot.Select((server) => server.KillAsync()));
+ }
+
/// <summary>
/// Gets application-wide logger used by gRPC.
/// </summary>
@@ -141,39 +217,62 @@ namespace Grpc.Core
}
/// <summary>
+ /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events.
+ /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
+ /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
+ /// Most users should rely on the default value provided by gRPC library.
+ /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+ /// </summary>
+ public static void SetCompletionQueueCount(int completionQueueCount)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+ GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
+ customCompletionQueueCount = completionQueueCount;
+ }
+ }
+
+ /// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcNativeInit();
- completionRegistry = new CompletionRegistry(this);
- threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault());
+ threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault());
threadPool.Start();
}
/// <summary>
- /// Gets the completion registry used by this gRPC environment.
+ /// Gets the completion queues used by this gRPC environment.
/// </summary>
- internal CompletionRegistry CompletionRegistry
+ internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
- return this.completionRegistry;
+ return this.threadPool.CompletionQueues;
}
}
- /// <summary>
- /// Gets the completion queue used by this gRPC environment.
- /// </summary>
- internal CompletionQueueSafeHandle CompletionQueue
+ internal bool IsAlive
{
get
{
- return this.threadPool.CompletionQueue;
+ return this.threadPool.IsAlive;
}
}
/// <summary>
+ /// Picks a completion queue in a round-robin fashion.
+ /// Shouldn't be invoked on a per-call basis (used at per-channel basis).
+ /// </summary>
+ internal CompletionQueueSafeHandle PickCompletionQueue()
+ {
+ var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
+ return this.threadPool.CompletionQueues.ElementAt(cqIndex);
+ }
+
+ /// <summary>
/// Gets the completion queue used by this gRPC environment.
/// </summary>
internal DebugStats DebugStats
@@ -206,13 +305,13 @@ namespace Grpc.Core
/// <summary>
/// Shuts down this environment.
/// </summary>
- private void Close()
+ private async Task ShutdownAsync()
{
if (isClosed)
{
throw new InvalidOperationException("Close has already been called");
}
- threadPool.Stop();
+ await threadPool.StopAsync().ConfigureAwait(false);
GrpcNativeShutdown();
isClosed = true;
@@ -230,5 +329,42 @@ namespace Grpc.Core
// more work, but seems to work reasonably well for a start.
return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
}
+
+ private int GetCompletionQueueCountOrDefault()
+ {
+ if (customCompletionQueueCount.HasValue)
+ {
+ return customCompletionQueueCount.Value;
+ }
+ // by default, create a completion queue for each thread
+ return GetThreadPoolSizeOrDefault();
+ }
+
+ private static class ShutdownHooks
+ {
+ static object staticLock = new object();
+ static bool hooksRegistered;
+
+ public static void Register()
+ {
+ lock (staticLock)
+ {
+ if (!hooksRegistered)
+ {
+ AppDomain.CurrentDomain.ProcessExit += ShutdownHookHandler;
+ AppDomain.CurrentDomain.DomainUnload += ShutdownHookHandler;
+ }
+ hooksRegistered = true;
+ }
+ }
+
+ /// <summary>
+ /// Handler for AppDomain.DomainUnload and AppDomain.ProcessExit hooks.
+ /// </summary>
+ private static void ShutdownHookHandler(object sender, EventArgs e)
+ {
+ Task.WaitAll(GrpcEnvironment.ShutdownChannelsAsync(), GrpcEnvironment.KillServersAsync());
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 55351869b5..895be690a5 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -32,12 +32,7 @@
#endregion
using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
using System.Threading.Tasks;
-using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Profiling;
using Grpc.Core.Utils;
@@ -57,9 +52,11 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
+ // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
// Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
+ // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
// Response headers set here once received.
TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
@@ -67,7 +64,7 @@ namespace Grpc.Core.Internal
ClientSideStatus? finishedStatus;
public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
- : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
+ : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
{
this.details = callDetails.WithOptions(callDetails.Options.Normalize());
this.initialMetadataSent = true; // we always send metadata at the very beginning of the call.
@@ -144,7 +141,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
readingDone = true;
@@ -171,7 +168,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
readingDone = true;
@@ -195,7 +192,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
halfcloseRequested = true;
@@ -220,7 +217,7 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(!started);
started = true;
- Initialize(environment.CompletionQueue);
+ Initialize(details.Channel.CompletionQueue);
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
{
@@ -232,11 +229,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming request. Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
{
- StartSendMessageInternal(msg, writeFlags, completionDelegate);
+ return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@@ -250,29 +246,32 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends halfclose, indicating client is done with streaming requests.
/// Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendCloseFromClientAsync()
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: true);
+ GrpcPreconditions.CheckState(started);
- if (!disposed && !finished)
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
{
- call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
+ return earlyResult;
}
- else
+
+ if (disposed || finished)
{
// In case the call has already been finished by the serverside,
- // the halfclose has already been done implicitly, so we only
- // emit the notification for the completion delegate.
- Task.Run(() => HandleSendCloseFromClientFinished(true));
+ // the halfclose has already been done implicitly, so just return
+ // completed task here.
+ halfcloseRequested = true;
+ return Task.FromResult<object>(null);
}
+ call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
halfcloseRequested = true;
- sendCompletionDelegate = completionDelegate;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
@@ -342,6 +341,45 @@ namespace Grpc.Core.Internal
get { return true; }
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ var earlyResult = CheckSendPreconditionsClientSide();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
+
+ if (finishedStatus.HasValue)
+ {
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ // Writing after the call has finished is not a programming error because server can close
+ // the call anytime, so don't throw directly, but let the write task finish with an error.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetException(new RpcException(finishedStatus.Value.Status));
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
+ private Task CheckSendPreconditionsClientSide()
+ {
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
+
+ if (cancelRequested)
+ {
+ // Return a cancelled task.
+ var tcs = new TaskCompletionSource<object>();
+ tcs.SetCanceled();
+ return tcs.Task;
+ }
+
+ return null;
+ }
+
private void Initialize(CompletionQueueSafeHandle cq)
{
using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
@@ -368,7 +406,7 @@ namespace Grpc.Core.Internal
var credentials = details.Options.Credentials;
using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
{
- var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
+ var result = details.Channel.Handle.CreateCall(
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
return result;
@@ -400,6 +438,7 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
{
+ // TODO(jtattermusch): handle success==false
responseHeadersTcs.SetResult(responseHeaders);
}
@@ -443,19 +482,6 @@ namespace Grpc.Core.Internal
}
}
- protected override void CheckSendingAllowed(bool allowFinished)
- {
- base.CheckSendingAllowed(true);
-
- // throwing RpcException if we already received status on client
- // side makes the most sense.
- // Note that this throws even for StatusCode.OK.
- if (!allowFinished && finishedStatus.HasValue)
- {
- throw new RpcException(finishedStatus.Value.Status);
- }
- }
-
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 4de23706b2..cb8366c216 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -58,7 +58,6 @@ namespace Grpc.Core.Internal
readonly Func<TWrite, byte[]> serializer;
readonly Func<byte[], TRead> deserializer;
- protected readonly GrpcEnvironment environment;
protected readonly object myLock = new object();
protected INativeCall call;
@@ -67,8 +66,8 @@ namespace Grpc.Core.Internal
protected bool started;
protected bool cancelRequested;
- protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
+ protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null.
protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
@@ -78,11 +77,10 @@ namespace Grpc.Core.Internal
protected bool initialMetadataSent;
protected long streamingWritesCounter; // Number of streaming send operations started so far.
- public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment)
+ public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer)
{
this.serializer = GrpcPreconditions.CheckNotNull(serializer);
this.deserializer = GrpcPreconditions.CheckNotNull(deserializer);
- this.environment = GrpcPreconditions.CheckNotNull(environment);
}
/// <summary>
@@ -128,28 +126,31 @@ namespace Grpc.Core.Internal
/// <summary>
/// Initiates sending a message. Only one send operation can be active at a time.
- /// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags)
{
byte[] payload = UnsafeSerialize(msg);
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: false);
+ GrpcPreconditions.CheckState(started);
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent);
- sendCompletionDelegate = completionDelegate;
initialMetadataSent = true;
streamingWritesCounter++;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
/// <summary>
/// Initiates reading a message. Only one read operation can be active at a time.
- /// completionDelegate is invoked upon completion.
/// </summary>
protected Task<TRead> ReadMessageInternalAsync()
{
@@ -159,7 +160,7 @@ namespace Grpc.Core.Internal
if (readingDone)
{
// the last read that returns null or throws an exception is idempotent
- // and maintain its state.
+ // and maintains its state.
GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
return streamingReadTcs.Task;
}
@@ -183,7 +184,7 @@ namespace Grpc.Core.Internal
{
if (!disposed && call != null)
{
- bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished);
+ bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished);
if (noMoreSendCompletions && readingDone && finished)
{
ReleaseResources();
@@ -213,24 +214,11 @@ namespace Grpc.Core.Internal
{
}
- protected virtual void CheckSendingAllowed(bool allowFinished)
- {
- GrpcPreconditions.CheckState(started);
- CheckNotCancelled();
- GrpcPreconditions.CheckState(!disposed || allowFinished);
-
- GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed.");
- GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished.");
- GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
- }
-
- protected void CheckNotCancelled()
- {
- if (cancelRequested)
- {
- throw new OperationCanceledException("Remote call has been cancelled.");
- }
- }
+ /// <summary>
+ /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send
+ /// logic by directly returning the write operation result task. Normally, null is returned.
+ /// </summary>
+ protected abstract Task CheckSendAllowedOrEarlyResult();
protected byte[] UnsafeSerialize(TWrite msg)
{
@@ -259,39 +247,27 @@ namespace Grpc.Core.Internal
}
}
- protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error)
- {
- try
- {
- completionDelegate(value, error);
- }
- catch (Exception e)
- {
- Logger.Error(e, "Exception occured while invoking completion delegate.");
- }
- }
-
/// <summary>
/// Handles send completion.
/// </summary>
protected void HandleSendFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
+ TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed"));
+ origTcs.SetException(new InvalidOperationException("Send failed"));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ origTcs.SetResult(null);
}
}
@@ -300,22 +276,23 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendCloseFromClientFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
+ TaskCompletionSource<object> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
+ origTcs = streamingWriteTcs;
+ streamingWriteTcs = null;
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed."));
+ // TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs).
+ origTcs.SetException(new InvalidOperationException("Sending close from client has failed."));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ origTcs.SetResult(null);
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index b1566b44a7..56c23ba3ef 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -51,14 +51,14 @@ namespace Grpc.Core.Internal
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly Server server;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment)
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer)
{
this.server = GrpcPreconditions.CheckNotNull(server);
}
- public void Initialize(CallSafeHandle call)
+ public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue)
{
- call.Initialize(environment.CompletionRegistry, environment.CompletionQueue);
+ call.Initialize(completionQueue);
server.AddCallReference(this);
InitializeInternal(call);
@@ -91,11 +91,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Sends a streaming response. Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags)
{
- StartSendMessageInternal(msg, writeFlags, completionDelegate);
+ return SendMessageInternalAsync(msg, writeFlags);
}
/// <summary>
@@ -110,20 +109,22 @@ namespace Grpc.Core.Internal
/// Initiates sending a initial metadata.
/// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation
/// to make things simpler.
- /// completionDelegate is invoked upon completion.
/// </summary>
- public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendInitialMetadataAsync(Metadata headers)
{
lock (myLock)
{
GrpcPreconditions.CheckNotNull(headers, "metadata");
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ GrpcPreconditions.CheckState(started);
GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call.");
GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts.");
- CheckSendingAllowed(allowFinished: false);
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
+ var earlyResult = CheckSendAllowedOrEarlyResult();
+ if (earlyResult != null)
+ {
+ return earlyResult;
+ }
using (var metadataArray = MetadataArraySafeHandle.Create(headers))
{
@@ -131,7 +132,8 @@ namespace Grpc.Core.Internal
}
this.initialMetadataSent = true;
- sendCompletionDelegate = completionDelegate;
+ streamingWriteTcs = new TaskCompletionSource<object>();
+ return streamingWriteTcs.Task;
}
}
@@ -196,6 +198,16 @@ namespace Grpc.Core.Internal
server.RemoveCallReference(this);
}
+ protected override Task CheckSendAllowedOrEarlyResult()
+ {
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed.");
+ GrpcPreconditions.CheckState(!finished, "Already finished.");
+ GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time");
+ GrpcPreconditions.CheckState(!disposed);
+
+ return null;
+ }
+
/// <summary>
/// Handles the server side close completion.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 66d2a66f99..c28a6f64d3 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -120,107 +120,4 @@ namespace Grpc.Core.Internal
return true;
}
}
-
- /// <summary>
- /// Status + metadata received on client side when call finishes.
- /// (when receive_status_on_client operation finishes).
- /// </summary>
- internal struct ClientSideStatus
- {
- readonly Status status;
- readonly Metadata trailers;
-
- public ClientSideStatus(Status status, Metadata trailers)
- {
- this.status = status;
- this.trailers = trailers;
- }
-
- public Status Status
- {
- get
- {
- return this.status;
- }
- }
-
- public Metadata Trailers
- {
- get
- {
- return this.trailers;
- }
- }
- }
-
- /// <summary>
- /// Details of a newly received RPC.
- /// </summary>
- internal struct ServerRpcNew
- {
- readonly Server server;
- readonly CallSafeHandle call;
- readonly string method;
- readonly string host;
- readonly Timespec deadline;
- readonly Metadata requestMetadata;
-
- public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
- {
- this.server = server;
- this.call = call;
- this.method = method;
- this.host = host;
- this.deadline = deadline;
- this.requestMetadata = requestMetadata;
- }
-
- public Server Server
- {
- get
- {
- return this.server;
- }
- }
-
- public CallSafeHandle Call
- {
- get
- {
- return this.call;
- }
- }
-
- public string Method
- {
- get
- {
- return this.method;
- }
- }
-
- public string Host
- {
- get
- {
- return this.host;
- }
- }
-
- public Timespec Deadline
- {
- get
- {
- return this.deadline;
- }
- }
-
- public Metadata RequestMetadata
- {
- get
- {
- return this.requestMetadata;
- }
- }
- }
}
diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/CallError.cs
index 74f86d2a30..541575f5e6 100644
--- a/src/csharp/Grpc.Core/Internal/Enums.cs
+++ b/src/csharp/Grpc.Core/Internal/CallError.cs
@@ -40,7 +40,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// grpc_call_error from grpc/grpc.h
/// </summary>
- internal enum GRPCCallError
+ internal enum CallError
{
/* everything went ok */
OK = 0,
@@ -70,42 +70,9 @@ namespace Grpc.Core.Internal
/// <summary>
/// Checks the call API invocation's result is OK.
/// </summary>
- public static void CheckOk(this GRPCCallError callError)
+ public static void CheckOk(this CallError callError)
{
- GrpcPreconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError);
+ GrpcPreconditions.CheckState(callError == CallError.OK, "Call error: " + callError);
}
}
-
- /// <summary>
- /// grpc_completion_type from grpc/grpc.h
- /// </summary>
- internal enum GRPCCompletionType
- {
- /* Shutting down */
- Shutdown,
-
- /* No event before timeout */
- Timeout,
-
- /* operation completion */
- OpComplete
- }
-
- /// <summary>
- /// gpr_clock_type from grpc/support/time.h
- /// </summary>
- internal enum GPRClockType
- {
- /* Monotonic clock */
- Monotonic,
-
- /* Realtime clock */
- Realtime,
-
- /* Precise clock good for performance profiling. */
- Precise,
-
- /* Timespan - the distance between two time points */
- Timespan
- }
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 244b97d4a4..82361f5797 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -47,16 +47,14 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
const uint GRPC_WRITE_BUFFER_HINT = 1;
- CompletionRegistry completionRegistry;
CompletionQueueSafeHandle completionQueue;
private CallSafeHandle()
{
}
- public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue)
+ public void Initialize(CompletionQueueSafeHandle completionQueue)
{
- this.completionRegistry = completionRegistry;
this.completionQueue = completionQueue;
}
@@ -70,7 +68,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags)
.CheckOk();
}
@@ -90,7 +88,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk();
}
}
@@ -100,7 +98,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk();
}
}
@@ -110,7 +108,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient()));
Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk();
}
}
@@ -120,7 +118,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk();
}
}
@@ -130,7 +128,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}
@@ -142,7 +140,7 @@ namespace Grpc.Core.Internal
{
var ctx = BatchContextSafeHandle.Create();
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
@@ -153,7 +151,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage()));
Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}
@@ -163,7 +161,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata()));
Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}
@@ -173,7 +171,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled()));
Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}
@@ -183,7 +181,7 @@ namespace Grpc.Core.Internal
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 1dbd1f4e34..62864dff0c 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -63,7 +63,7 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
- public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
+ public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials)
{
using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall"))
{
@@ -72,7 +72,7 @@ namespace Grpc.Core.Internal
{
result.SetCredentials(credentials);
}
- result.Initialize(registry, cq);
+ result.Initialize(cq);
return result;
}
}
@@ -82,11 +82,10 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0);
}
- public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq,
- CompletionRegistry completionRegistry, BatchCompletionDelegate callback)
+ public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback)
{
var ctx = BatchContextSafeHandle.Create();
- completionRegistry.RegisterBatchCompletion(ctx, callback);
+ cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 013f00ff6f..924de028f5 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -50,16 +50,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TRequest message)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendMessageAsync(message, GetWriteFlags());
}
public Task CompleteAsync()
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendCloseFromClient(taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendCloseFromClientAsync();
}
public WriteOptions WriteOptions
diff --git a/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs b/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs
new file mode 100644
index 0000000000..5727e3f11f
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs
@@ -0,0 +1,70 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using Grpc.Core;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Status + metadata received on client side when call finishes.
+ /// (when receive_status_on_client operation finishes).
+ /// </summary>
+ internal struct ClientSideStatus
+ {
+ readonly Status status;
+ readonly Metadata trailers;
+
+ public ClientSideStatus(Status status, Metadata trailers)
+ {
+ this.status = status;
+ this.trailers = trailers;
+ }
+
+ public Status Status
+ {
+ get
+ {
+ return this.status;
+ }
+ }
+
+ public Metadata Trailers
+ {
+ get
+ {
+ return this.trailers;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/ClockType.cs b/src/csharp/Grpc.Core/Internal/ClockType.cs
new file mode 100644
index 0000000000..57533c9d2f
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/ClockType.cs
@@ -0,0 +1,53 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// gpr_clock_type from grpc/support/time.h
+ /// </summary>
+ internal enum ClockType
+ {
+ /* Monotonic clock */
+ Monotonic,
+
+ /* Realtime clock */
+ Realtime,
+
+ /* Precise clock good for performance profiling. */
+ Precise,
+
+ /* Timespan - the distance between two time points */
+ Timespan
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
index 288680792a..a78e9b70f3 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs
@@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
static readonly NativeMethods Native = NativeMethods.Get();
- public GRPCCompletionType type;
+ public CompletionType type;
public int success;
public IntPtr tag;
@@ -55,5 +55,20 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_sizeof_grpc_event();
}
}
+
+ /// <summary>
+ /// grpc_completion_type from grpc/grpc.h
+ /// </summary>
+ internal enum CompletionType
+ {
+ /* Shutting down */
+ Shutdown,
+
+ /* No event before timeout */
+ Timeout,
+
+ /* operation completion */
+ OpComplete
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
index 91364cdc70..46f5624223 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs
@@ -45,6 +45,7 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
AtomicCounter shutdownRefcount = new AtomicCounter(1);
+ CompletionRegistry completionRegistry;
private CompletionQueueSafeHandle()
{
@@ -53,7 +54,13 @@ namespace Grpc.Core.Internal
public static CompletionQueueSafeHandle Create()
{
return Native.grpcsharp_completion_queue_create();
+ }
+ public static CompletionQueueSafeHandle Create(CompletionRegistry completionRegistry)
+ {
+ var cq = Native.grpcsharp_completion_queue_create();
+ cq.completionRegistry = completionRegistry;
+ return cq;
}
public CompletionQueueEvent Next()
@@ -83,6 +90,15 @@ namespace Grpc.Core.Internal
DecrementShutdownRefcount();
}
+ /// <summary>
+ /// Completion registry associated with this completion queue.
+ /// Doesn't need to be set if only using Pluck() operations.
+ /// </summary>
+ public CompletionRegistry CompletionRegistry
+ {
+ get { return completionRegistry; }
+ }
+
protected override bool ReleaseHandle()
{
Native.grpcsharp_completion_queue_destroy(handle);
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 4b7124ee74..a446c1f99f 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -33,15 +33,16 @@
using System;
using System.Collections.Generic;
-using System.Runtime.InteropServices;
+using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Logging;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
- /// Pool of threads polling on the same completion queue.
+ /// Pool of threads polling on a set of completions queues.
/// </summary>
internal class GrpcThreadPool
{
@@ -51,25 +52,33 @@ namespace Grpc.Core.Internal
readonly object myLock = new object();
readonly List<Thread> threads = new List<Thread>();
readonly int poolSize;
+ readonly int completionQueueCount;
- CompletionQueueSafeHandle cq;
+ bool stopRequested;
- public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
+ IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
+
+ /// <summary>
+ /// Creates a thread pool threads polling on a set of completions queues.
+ /// </summary>
+ /// <param name="environment">Environment.</param>
+ /// <param name="poolSize">Pool size.</param>
+ /// <param name="completionQueueCount">Completion queue count.</param>
+ public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount)
{
this.environment = environment;
this.poolSize = poolSize;
+ this.completionQueueCount = completionQueueCount;
+ GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount,
+ "Thread pool size cannot be smaller than the number of completion queues used.");
}
public void Start()
{
lock (myLock)
{
- if (cq != null)
- {
- throw new InvalidOperationException("Already started.");
- }
-
- cq = CompletionQueueSafeHandle.Create();
+ GrpcPreconditions.CheckState(completionQueues == null, "Already started.");
+ completionQueues = CreateCompletionQueueList(environment, completionQueueCount);
for (int i = 0; i < poolSize; i++)
{
@@ -78,53 +87,85 @@ namespace Grpc.Core.Internal
}
}
- public void Stop()
+ public Task StopAsync()
{
lock (myLock)
{
- cq.Shutdown();
+ GrpcPreconditions.CheckState(!stopRequested, "Stop already requested.");
+ stopRequested = true;
+
+ foreach (var cq in completionQueues)
+ {
+ cq.Shutdown();
+ }
+ }
+
+ return Task.Run(() =>
+ {
foreach (var thread in threads)
{
thread.Join();
}
- cq.Dispose();
+ foreach (var cq in completionQueues)
+ {
+ cq.Dispose();
+ }
+ });
+ }
+
+ /// <summary>
+ /// Returns true if there is at least one thread pool thread that hasn't
+ /// already stopped.
+ /// Threads can either stop because all completion queues shut down or
+ /// because all foreground threads have already shutdown and process is
+ /// going to exit.
+ /// </summary>
+ internal bool IsAlive
+ {
+ get
+ {
+ return threads.Any(t => t.ThreadState != ThreadState.Stopped);
}
}
- internal CompletionQueueSafeHandle CompletionQueue
+ internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
{
get
{
- return cq;
+ return completionQueues;
}
}
- private Thread CreateAndStartThread(int i)
+ private Thread CreateAndStartThread(int threadIndex)
{
- var thread = new Thread(new ThreadStart(RunHandlerLoop));
- thread.IsBackground = false;
+ var cqIndex = threadIndex % completionQueues.Count;
+ var cq = completionQueues.ElementAt(cqIndex);
+
+ var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
+ thread.IsBackground = true;
+ thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();
- thread.Name = "grpc " + i;
+
return thread;
}
/// <summary>
/// Body of the polling thread.
/// </summary>
- private void RunHandlerLoop()
+ private void RunHandlerLoop(CompletionQueueSafeHandle cq)
{
CompletionQueueEvent ev;
do
{
ev = cq.Next();
- if (ev.type == GRPCCompletionType.OpComplete)
+ if (ev.type == CompletionQueueEvent.CompletionType.OpComplete)
{
bool success = (ev.success != 0);
IntPtr tag = ev.tag;
try
{
- var callback = environment.CompletionRegistry.Extract(tag);
+ var callback = cq.CompletionRegistry.Extract(tag);
callback(success);
}
catch (Exception e)
@@ -133,7 +174,18 @@ namespace Grpc.Core.Internal
}
}
}
- while (ev.type != GRPCCompletionType.Shutdown);
+ while (ev.type != CompletionQueueEvent.CompletionType.Shutdown);
+ }
+
+ private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount)
+ {
+ var list = new List<CompletionQueueSafeHandle>();
+ for (int i = 0; i < completionQueueCount; i++)
+ {
+ var completionRegistry = new CompletionRegistry(environment);
+ list.Add(CompletionQueueSafeHandle.Create(completionRegistry));
+ }
+ return list.AsReadOnly();
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
index 25735d5262..dc9f62fdab 100644
--- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
@@ -50,6 +50,11 @@ namespace Grpc.Core.Internal
{
using (Profilers.ForCurrentThread().NewScope("MetadataArraySafeHandle.Create"))
{
+ if (metadata.Count == 0)
+ {
+ return new MetadataArraySafeHandle();
+ }
+
// TODO(jtattermusch): we might wanna check that the metadata is readonly
var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
for (int i = 0; i < metadata.Count; i++)
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index c277c73ef0..65607ed120 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -137,6 +137,7 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release;
public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create;
+ public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue;
public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port;
public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port;
public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start;
@@ -244,6 +245,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = GetMethodDelegate<Delegates.grpcsharp_server_credentials_release_delegate>(library);
this.grpcsharp_server_create = GetMethodDelegate<Delegates.grpcsharp_server_create_delegate>(library);
+ this.grpcsharp_server_register_completion_queue = GetMethodDelegate<Delegates.grpcsharp_server_register_completion_queue_delegate>(library);
this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_insecure_http2_port_delegate>(library);
this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_secure_http2_port_delegate>(library);
this.grpcsharp_server_start = GetMethodDelegate<Delegates.grpcsharp_server_start_delegate>(library);
@@ -348,6 +350,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_server_credentials_release = PInvokeMethods.grpcsharp_server_credentials_release;
this.grpcsharp_server_create = PInvokeMethods.grpcsharp_server_create;
+ this.grpcsharp_server_register_completion_queue = PInvokeMethods.grpcsharp_server_register_completion_queue;
this.grpcsharp_server_add_insecure_http2_port = PInvokeMethods.grpcsharp_server_add_insecure_http2_port;
this.grpcsharp_server_add_secure_http2_port = PInvokeMethods.grpcsharp_server_add_secure_http2_port;
this.grpcsharp_server_start = PInvokeMethods.grpcsharp_server_start;
@@ -418,33 +421,33 @@ namespace Grpc.Core.Internal
public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2);
public delegate void grpcsharp_call_credentials_release_delegate(IntPtr credentials);
- public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
- public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
- public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
+ public delegate CallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
+ public delegate CallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
- public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
- public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
- public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
- public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
- public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
- public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_start_serverside_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_start_serverside_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
- public delegate GRPCCallError grpcsharp_call_send_initial_metadata_delegate(CallSafeHandle call,
+ public delegate CallError grpcsharp_call_send_initial_metadata_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
- public delegate GRPCCallError grpcsharp_call_set_credentials_delegate(CallSafeHandle call, CallCredentialsSafeHandle credentials);
+ public delegate CallError grpcsharp_call_set_credentials_delegate(CallSafeHandle call, CallCredentialsSafeHandle credentials);
public delegate CStringSafeHandle grpcsharp_call_get_peer_delegate(CallSafeHandle call);
public delegate void grpcsharp_call_destroy_delegate(IntPtr call);
@@ -493,23 +496,24 @@ namespace Grpc.Core.Internal
public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth);
public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials);
- public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
+ public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args);
+ public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq);
public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr);
public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds);
public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server);
- public delegate GRPCCallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+ public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
public delegate void grpcsharp_server_cancel_all_calls_delegate(ServerSafeHandle server);
public delegate void grpcsharp_server_shutdown_and_notify_callback_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
public delegate void grpcsharp_server_destroy_delegate(IntPtr server);
- public delegate Timespec gprsharp_now_delegate(GPRClockType clockType);
- public delegate Timespec gprsharp_inf_future_delegate(GPRClockType clockType);
- public delegate Timespec gprsharp_inf_past_delegate(GPRClockType clockType);
+ public delegate Timespec gprsharp_now_delegate(ClockType clockType);
+ public delegate Timespec gprsharp_inf_future_delegate(ClockType clockType);
+ public delegate Timespec gprsharp_inf_past_delegate(ClockType clockType);
- public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, GPRClockType targetClock);
+ public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, ClockType targetClock);
public delegate int gprsharp_sizeof_timespec_delegate();
- public delegate GRPCCallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
+ public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr);
}
@@ -587,59 +591,59 @@ namespace Grpc.Core.Internal
// CallSafeHandle
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call);
+ public static extern CallError grpcsharp_call_cancel(CallSafeHandle call);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
+ public static extern CallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_unary(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_message(CallSafeHandle call,
BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_recv_message(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_start_serverside(CallSafeHandle call,
BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
+ public static extern CallError grpcsharp_call_send_initial_metadata(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_call_set_credentials(CallSafeHandle call, CallCredentialsSafeHandle credentials);
+ public static extern CallError grpcsharp_call_set_credentials(CallSafeHandle call, CallCredentialsSafeHandle credentials);
[DllImport("grpc_csharp_ext.dll")]
public static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call);
@@ -773,7 +777,10 @@ namespace Grpc.Core.Internal
// ServerSafeHandle
[DllImport("grpc_csharp_ext.dll")]
- public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args);
+ public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args);
+
+ [DllImport("grpc_csharp_ext.dll")]
+ public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq);
[DllImport("grpc_csharp_ext.dll")]
public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr);
@@ -785,7 +792,7 @@ namespace Grpc.Core.Internal
public static extern void grpcsharp_server_start(ServerSafeHandle server);
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
+ public static extern CallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx);
[DllImport("grpc_csharp_ext.dll")]
public static extern void grpcsharp_server_cancel_all_calls(ServerSafeHandle server);
@@ -799,16 +806,16 @@ namespace Grpc.Core.Internal
// Timespec
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_now(GPRClockType clockType);
+ public static extern Timespec gprsharp_now(ClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_inf_future(GPRClockType clockType);
+ public static extern Timespec gprsharp_inf_future(ClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_inf_past(GPRClockType clockType);
+ public static extern Timespec gprsharp_inf_past(ClockType clockType);
[DllImport("grpc_csharp_ext.dll")]
- public static extern Timespec gprsharp_convert_clock_type(Timespec t, GPRClockType targetClock);
+ public static extern Timespec gprsharp_convert_clock_type(Timespec t, ClockType targetClock);
[DllImport("grpc_csharp_ext.dll")]
public static extern int gprsharp_sizeof_timespec();
@@ -816,7 +823,7 @@ namespace Grpc.Core.Internal
// Testing
[DllImport("grpc_csharp_ext.dll")]
- public static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
+ public static extern CallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback);
[DllImport("grpc_csharp_ext.dll")]
public static extern IntPtr grpcsharp_test_nop(IntPtr ptr);
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 85b7a4b01e..6a2f520163 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -44,7 +44,7 @@ namespace Grpc.Core.Internal
{
internal interface IServerCallHandler
{
- Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment);
+ Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq);
}
internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler
@@ -62,14 +62,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -121,14 +121,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -179,14 +179,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -237,14 +237,14 @@ namespace Grpc.Core.Internal
this.handler = handler;
}
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment, newRpc.Server);
+ newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall);
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
@@ -281,13 +281,13 @@ namespace Grpc.Core.Internal
{
public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler();
- public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment)
+ public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
- (payload) => payload, (payload) => payload, environment, newRpc.Server);
+ (payload) => payload, (payload) => payload, newRpc.Server);
- asyncCall.Initialize(newRpc.Call);
+ asyncCall.Initialize(newRpc.Call, cq);
var finishedTask = asyncCall.ServerSideCallAsync();
await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
@@ -317,7 +317,7 @@ namespace Grpc.Core.Internal
where TRequest : class
where TResponse : class
{
- DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime();
+ DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime();
return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline,
newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream);
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index ecfee0bfdd..25b79b4398 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -52,16 +52,12 @@ namespace Grpc.Core.Internal
public Task WriteAsync(TResponse message)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendMessageAsync(message, GetWriteFlags());
}
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate);
- return taskSource.Task;
+ return call.SendInitialMetadataAsync(responseHeaders);
}
public WriteOptions WriteOptions
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/ServerRpcNew.cs
index 7e86fddb4d..e4f1880bdb 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRpcNew.cs
@@ -32,63 +32,78 @@
#endregion
using System;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Runtime.InteropServices;
-using System.Threading;
-using System.Threading.Tasks;
-using Grpc.Core.Internal;
-using Grpc.Core.Utils;
+using Grpc.Core;
namespace Grpc.Core.Internal
{
/// <summary>
- /// If error != null, there's been an error or operation has been cancelled.
+ /// Details of a newly received RPC.
/// </summary>
- internal delegate void AsyncCompletionDelegate<T>(T result, Exception error);
-
- /// <summary>
- /// Helper for transforming AsyncCompletionDelegate into full-fledged Task.
- /// </summary>
- internal class AsyncCompletionTaskSource<T>
+ internal struct ServerRpcNew
{
- readonly TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
- readonly AsyncCompletionDelegate<T> completionDelegate;
+ readonly Server server;
+ readonly CallSafeHandle call;
+ readonly string method;
+ readonly string host;
+ readonly Timespec deadline;
+ readonly Metadata requestMetadata;
+
+ public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
+ {
+ this.server = server;
+ this.call = call;
+ this.method = method;
+ this.host = host;
+ this.deadline = deadline;
+ this.requestMetadata = requestMetadata;
+ }
+
+ public Server Server
+ {
+ get
+ {
+ return this.server;
+ }
+ }
- public AsyncCompletionTaskSource()
+ public CallSafeHandle Call
{
- completionDelegate = new AsyncCompletionDelegate<T>(HandleCompletion);
+ get
+ {
+ return this.call;
+ }
}
- public Task<T> Task
+ public string Method
{
get
{
- return tcs.Task;
+ return this.method;
}
}
- public AsyncCompletionDelegate<T> CompletionDelegate
+ public string Host
{
get
{
- return completionDelegate;
+ return this.host;
}
}
- private void HandleCompletion(T value, Exception error)
+ public Timespec Deadline
{
- if (error == null)
+ get
{
- tcs.SetResult(value);
- return;
+ return this.deadline;
}
- if (error is OperationCanceledException)
+ }
+
+ public Metadata RequestMetadata
+ {
+ get
{
- tcs.SetCanceled();
- return;
+ return this.requestMetadata;
}
- tcs.SetException(error);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 6b5f70e220..8581302706 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -31,12 +31,6 @@
#endregion
-using System;
-using System.Collections.Concurrent;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
-using Grpc.Core.Utils;
-
namespace Grpc.Core.Internal
{
/// <summary>
@@ -50,12 +44,17 @@ namespace Grpc.Core.Internal
{
}
- public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
+ public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args)
{
// Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
// Doing so would make object finalizer crash if we end up abandoning the handle.
GrpcEnvironment.GrpcNativeInit();
- return Native.grpcsharp_server_create(cq, args);
+ return Native.grpcsharp_server_create(args);
+ }
+
+ public void RegisterCompletionQueue(CompletionQueueSafeHandle cq)
+ {
+ Native.grpcsharp_server_register_completion_queue(this, cq);
}
public int AddInsecurePort(string addr)
@@ -73,18 +72,18 @@ namespace Grpc.Core.Internal
Native.grpcsharp_server_start(this);
}
- public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment)
+ public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
- environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx);
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
- public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment)
+ public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue)
{
var ctx = BatchContextSafeHandle.Create();
- environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
- Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk();
+ completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback);
+ Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk();
}
protected override bool ReleaseHandle()
diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs
index 56172a5dda..c9fd710e1e 100644
--- a/src/csharp/Grpc.Core/Internal/Timespec.cs
+++ b/src/csharp/Grpc.Core/Internal/Timespec.cs
@@ -49,11 +49,11 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);
- public Timespec(long tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, GPRClockType.Realtime)
+ public Timespec(long tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, ClockType.Realtime)
{
}
- public Timespec(long tv_sec, int tv_nsec, GPRClockType clock_type)
+ public Timespec(long tv_sec, int tv_nsec, ClockType clock_type)
{
this.tv_sec = tv_sec;
this.tv_nsec = tv_nsec;
@@ -62,7 +62,7 @@ namespace Grpc.Core.Internal
private long tv_sec;
private int tv_nsec;
- private GPRClockType clock_type;
+ private ClockType clock_type;
/// <summary>
/// Timespec a long time in the future.
@@ -71,7 +71,7 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_inf_future(GPRClockType.Realtime);
+ return new Timespec(long.MaxValue, 0, ClockType.Realtime);
}
}
@@ -82,7 +82,7 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_inf_past(GPRClockType.Realtime);
+ return new Timespec(long.MinValue, 0, ClockType.Realtime);
}
}
@@ -93,7 +93,7 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_now(GPRClockType.Realtime);
+ return Native.gprsharp_now(ClockType.Realtime);
}
}
@@ -122,7 +122,7 @@ namespace Grpc.Core.Internal
/// <summary>
/// Converts the timespec to desired clock type.
/// </summary>
- public Timespec ToClockType(GPRClockType targetClock)
+ public Timespec ToClockType(ClockType targetClock)
{
return Native.gprsharp_convert_clock_type(this, targetClock);
}
@@ -142,7 +142,7 @@ namespace Grpc.Core.Internal
public DateTime ToDateTime()
{
GrpcPreconditions.CheckState(tv_nsec >= 0 && tv_nsec < NanosPerSecond);
- GrpcPreconditions.CheckState(clock_type == GPRClockType.Realtime);
+ GrpcPreconditions.CheckState(clock_type == ClockType.Realtime);
// fast path for InfFuture
if (this.Equals(InfFuture))
@@ -227,10 +227,11 @@ namespace Grpc.Core.Internal
{
get
{
- return Native.gprsharp_now(GPRClockType.Precise);
+ return Native.gprsharp_now(ClockType.Precise);
}
}
+ // for tests only
internal static int NativeSize
{
get
@@ -238,5 +239,23 @@ namespace Grpc.Core.Internal
return Native.gprsharp_sizeof_timespec();
}
}
+
+ // for tests only
+ internal static Timespec NativeInfFuture
+ {
+ get
+ {
+ return Native.gprsharp_inf_future(ClockType.Realtime);
+ }
+ }
+
+ // for tests only
+ public static Timespec NativeInfPast
+ {
+ get
+ {
+ return Native.gprsharp_inf_past(ClockType.Realtime);
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs
index bde74945fb..370fa98687 100644
--- a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs
+++ b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs
@@ -16,6 +16,12 @@ using System.Runtime.CompilerServices;
"0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
"27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
"71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
+[assembly: InternalsVisibleTo("Grpc.IntegrationTesting,PublicKey=" +
+ "00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" +
+ "0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" +
+ "27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" +
+ "71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")]
#else
[assembly: InternalsVisibleTo("Grpc.Core.Tests")]
+[assembly: InternalsVisibleTo("Grpc.IntegrationTesting")]
#endif
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index d538a4671f..ae7a8c9a9a 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -34,8 +34,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
-using System.Diagnostics;
-using System.Runtime.InteropServices;
+using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
@@ -48,7 +47,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
- const int InitialAllowRpcTokenCount = 10;
+ const int InitialAllowRpcTokenCountPerCq = 10;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@@ -68,11 +67,19 @@ namespace Grpc.Core
bool startRequested;
volatile bool shutdownRequested;
+
+ /// <summary>
+ /// Creates a new server.
+ /// </summary>
+ public Server() : this(null)
+ {
+ }
+
/// <summary>
- /// Create a new server.
+ /// Creates a new server.
/// </summary>
/// <param name="options">Channel options.</param>
- public Server(IEnumerable<ChannelOption> options = null)
+ public Server(IEnumerable<ChannelOption> options)
{
this.serviceDefinitions = new ServiceDefinitionCollection(this);
this.ports = new ServerPortCollection(this);
@@ -80,8 +87,14 @@ namespace Grpc.Core
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
- this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs);
+ this.handle = ServerSafeHandle.NewServer(channelArgs);
}
+
+ foreach (var cq in environment.CompletionQueues)
+ {
+ this.handle.RegisterCompletionQueue(cq);
+ }
+ GrpcEnvironment.RegisterServer(this);
}
/// <summary>
@@ -133,9 +146,12 @@ namespace Grpc.Core
// Starting with more than one AllowOneRpc tokens can significantly increase
// unary RPC throughput.
- for (int i = 0; i < InitialAllowRpcTokenCount; i++)
+ for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++)
{
- AllowOneRpc();
+ foreach (var cq in environment.CompletionQueues)
+ {
+ AllowOneRpc(cq);
+ }
}
}
}
@@ -145,41 +161,24 @@ namespace Grpc.Core
/// cleans up used resources. The returned task finishes when shutdown procedure
/// is complete.
/// </summary>
- public async Task ShutdownAsync()
+ /// <remarks>
+ /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
+ /// </remarks>
+ public Task ShutdownAsync()
{
- lock (myLock)
- {
- GrpcPreconditions.CheckState(startRequested);
- GrpcPreconditions.CheckState(!shutdownRequested);
- shutdownRequested = true;
- }
-
- handle.ShutdownAndNotify(HandleServerShutdown, environment);
- await shutdownTcs.Task.ConfigureAwait(false);
- DisposeHandle();
-
- await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
+ return ShutdownInternalAsync(false);
}
/// <summary>
/// Requests server shutdown while cancelling all the in-progress calls.
/// The returned task finishes when shutdown procedure is complete.
/// </summary>
- public async Task KillAsync()
+ /// <remarks>
+ /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
+ /// </remarks>
+ public Task KillAsync()
{
- lock (myLock)
- {
- GrpcPreconditions.CheckState(startRequested);
- GrpcPreconditions.CheckState(!shutdownRequested);
- shutdownRequested = true;
- }
-
- handle.ShutdownAndNotify(HandleServerShutdown, environment);
- handle.CancelAllCalls();
- await shutdownTcs.Task.ConfigureAwait(false);
- DisposeHandle();
-
- await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
+ return ShutdownInternalAsync(true);
}
internal void AddCallReference(object call)
@@ -198,6 +197,53 @@ namespace Grpc.Core
}
/// <summary>
+ /// Shuts down the server.
+ /// </summary>
+ private async Task ShutdownInternalAsync(bool kill)
+ {
+ lock (myLock)
+ {
+ GrpcPreconditions.CheckState(startRequested);
+ GrpcPreconditions.CheckState(!shutdownRequested);
+ shutdownRequested = true;
+ }
+ GrpcEnvironment.UnregisterServer(this);
+
+ var cq = environment.CompletionQueues.First(); // any cq will do
+ handle.ShutdownAndNotify(HandleServerShutdown, cq);
+ if (kill)
+ {
+ handle.CancelAllCalls();
+ }
+
+ await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
+
+ DisposeHandle();
+
+ await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
+ }
+
+ /// <summary>
+ /// In case the environment's threadpool becomes dead, the shutdown completion will
+ /// never be delivered, but we need to release the environment's handle anyway.
+ /// </summary>
+ private async Task ShutdownCompleteOrEnvironmentDeadAsync()
+ {
+ while (true)
+ {
+ var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false);
+ if (shutdownTcs.Task == task)
+ {
+ return;
+ }
+ if (!environment.IsAlive)
+ {
+ return;
+ }
+ }
+ }
+
+ /// <summary>
/// Adds a service definition.
/// </summary>
private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)
@@ -244,11 +290,11 @@ namespace Grpc.Core
/// <summary>
/// Allows one new RPC call to be received by server.
/// </summary>
- private void AllowOneRpc()
+ private void AllowOneRpc(CompletionQueueSafeHandle cq)
{
if (!shutdownRequested)
{
- handle.RequestCall(HandleNewServerRpc, environment);
+ handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq);
}
}
@@ -265,7 +311,7 @@ namespace Grpc.Core
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
- private async Task HandleCallAsync(ServerRpcNew newRpc)
+ private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq)
{
try
{
@@ -274,7 +320,7 @@ namespace Grpc.Core
{
callHandler = NoSuchMethodCallHandler.Instance;
}
- await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false);
+ await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false);
}
catch (Exception e)
{
@@ -285,9 +331,9 @@ namespace Grpc.Core
/// <summary>
/// Handles the native callback.
/// </summary>
- private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
+ private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq)
{
- Task.Run(() => AllowOneRpc());
+ Task.Run(() => AllowOneRpc(cq));
if (success)
{
@@ -296,7 +342,7 @@ namespace Grpc.Core
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
- HandleCallAsync(newRpc); // we don't need to await.
+ HandleCallAsync(newRpc, cq); // we don't need to await.
}
}
}
diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
index deb1431ca3..ac08c04bf6 100644
--- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs
+++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
@@ -63,11 +63,10 @@ namespace Grpc.Core
/// <summary>
/// Creates a new builder object for <c>ServerServiceDefinition</c>.
/// </summary>
- /// <param name="serviceName">The service name.</param>
/// <returns>The builder object.</returns>
- public static Builder CreateBuilder(string serviceName)
+ public static Builder CreateBuilder()
{
- return new Builder(serviceName);
+ return new Builder();
}
/// <summary>
@@ -75,16 +74,13 @@ namespace Grpc.Core
/// </summary>
public class Builder
{
- readonly string serviceName;
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
/// <summary>
/// Creates a new instance of builder.
/// </summary>
- /// <param name="serviceName">The service name.</param>
- public Builder(string serviceName)
+ public Builder()
{
- this.serviceName = serviceName;
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/WriteOptions.cs b/src/csharp/Grpc.Core/WriteOptions.cs
index 7523ada84a..4c9706d966 100644
--- a/src/csharp/Grpc.Core/WriteOptions.cs
+++ b/src/csharp/Grpc.Core/WriteOptions.cs
@@ -1,6 +1,6 @@
#region Copyright notice and license
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -64,7 +64,7 @@ namespace Grpc.Core
/// </summary>
public static readonly WriteOptions Default = new WriteOptions();
- private WriteFlags flags;
+ private readonly WriteFlags flags;
/// <summary>
/// Initializes a new instance of <c>WriteOptions</c> class.