aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/csharp
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2017-12-04 16:25:01 -0800
committerGravatar David Garcia Quintas <dgq@google.com>2017-12-04 16:25:01 -0800
commit6667f15d8e4ea915bd64647e2c7949d37d1bad91 (patch)
treede1f6de5029d7c51ee369efa5294e00736014597 /src/csharp
parent0f91e513d9dc9bee529701ba254933eb7be07b38 (diff)
parent6fa206de8f8a1444fff19a84945a424c0cabb41c (diff)
Merge branch 'master' of github.com:grpc/grpc into backoff_cpp
Diffstat (limited to 'src/csharp')
-rw-r--r--src/csharp/Grpc.Core.Tests/CallCancellationTest.cs182
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs68
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs79
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs33
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamReader.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs25
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs22
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs33
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs20
-rw-r--r--src/csharp/Grpc.Core/Internal/CompletionRegistry.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs196
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/IObjectPool.cs35
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs11
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs3
-rwxr-xr-xsrc/csharp/Grpc.Core/Version.csproj.include2
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs4
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs13
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ServerRunners.cs13
-rw-r--r--src/csharp/Grpc.IntegrationTesting/StressTestClient.cs6
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TimeStats.cs90
-rw-r--r--src/csharp/Grpc.IntegrationTesting/WallClockStopwatch.cs63
-rw-r--r--src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs6
-rw-r--r--src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs5
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.bat2
-rwxr-xr-xsrc/csharp/build_packages_dotnetcli.sh4
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c13
-rw-r--r--src/csharp/tests.json2
31 files changed, 731 insertions, 226 deletions
diff --git a/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs b/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs
new file mode 100644
index 0000000000..e040f52380
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs
@@ -0,0 +1,182 @@
+#region Copyright notice and license
+
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Profiling;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class CallCancellationTest
+ {
+ const string Host = "127.0.0.1";
+
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper(Host);
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ channel.ShutdownAsync().Wait();
+ server.ShutdownAsync().Wait();
+ }
+
+ [Test]
+ public async Task ClientStreamingCall_CancelAfterBegin()
+ {
+ var barrier = new TaskCompletionSource<object>();
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ barrier.SetResult(null);
+ await requestStream.ToListAsync();
+ return "";
+ });
+
+ var cts = new CancellationTokenSource();
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
+
+ await barrier.Task; // make sure the handler has started.
+ cts.Cancel();
+
+ try
+ {
+ // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+ await call.ResponseAsync;
+ Assert.Fail();
+ }
+ catch (RpcException ex)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+ }
+ }
+
+ [Test]
+ public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull()
+ {
+ var handlerStartedBarrier = new TaskCompletionSource<object>();
+ var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>();
+ var successTcs = new TaskCompletionSource<string>();
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ handlerStartedBarrier.SetResult(null);
+
+ // wait for cancellation to be delivered.
+ context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null));
+ await cancelNotificationReceivedBarrier.Task;
+
+ var moveNextResult = await requestStream.MoveNext();
+ successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL");
+ return "";
+ });
+
+ var cts = new CancellationTokenSource();
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
+
+ await handlerStartedBarrier.Task;
+ cts.Cancel();
+
+ try
+ {
+ await call.ResponseAsync;
+ Assert.Fail();
+ }
+ catch (RpcException ex)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+ }
+ Assert.AreEqual("SUCCESS", await successTcs.Task);
+ }
+
+ [Test]
+ public async Task ClientStreamingCall_CancelServerSideRead()
+ {
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ var cts = new CancellationTokenSource();
+ var moveNextTask = requestStream.MoveNext(cts.Token);
+ cts.Cancel();
+ await moveNextTask;
+ return "";
+ });
+
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall());
+ try
+ {
+ // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+ await call.ResponseAsync;
+ Assert.Fail();
+ }
+ catch (RpcException ex)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+ }
+ }
+
+ [Test]
+ public async Task ServerStreamingCall_CancelClientSideRead()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ await responseStream.WriteAsync("abc");
+ while (!context.CancellationToken.IsCancellationRequested)
+ {
+ await Task.Delay(10);
+ }
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+ await call.ResponseStream.MoveNext();
+ Assert.AreEqual("abc", call.ResponseStream.Current);
+
+ var cts = new CancellationTokenSource();
+ var moveNextTask = call.ResponseStream.MoveNext(cts.Token);
+ cts.Cancel();
+
+ try
+ {
+ // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
+ await moveNextTask;
+ Assert.Fail();
+ }
+ catch (RpcException ex)
+ {
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 72d9035a6f..90dd365b07 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -273,74 +273,6 @@ namespace Grpc.Core.Tests
}
[Test]
- public async Task ClientStreamingCall_CancelAfterBegin()
- {
- var barrier = new TaskCompletionSource<object>();
-
- helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
- {
- barrier.SetResult(null);
- await requestStream.ToListAsync();
- return "";
- });
-
- var cts = new CancellationTokenSource();
- var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
-
- await barrier.Task; // make sure the handler has started.
- cts.Cancel();
-
- try
- {
- // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock.
- await call.ResponseAsync;
- Assert.Fail();
- }
- catch (RpcException ex)
- {
- Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
- }
- }
-
- [Test]
- public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull()
- {
- var handlerStartedBarrier = new TaskCompletionSource<object>();
- var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>();
- var successTcs = new TaskCompletionSource<string>();
-
- helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
- {
- handlerStartedBarrier.SetResult(null);
-
- // wait for cancellation to be delivered.
- context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null));
- await cancelNotificationReceivedBarrier.Task;
-
- var moveNextResult = await requestStream.MoveNext();
- successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL");
- return "";
- });
-
- var cts = new CancellationTokenSource();
- var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
-
- await handlerStartedBarrier.Task;
- cts.Cancel();
-
- try
- {
- await call.ResponseAsync;
- Assert.Fail();
- }
- catch (RpcException ex)
- {
- Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
- }
- Assert.AreEqual("SUCCESS", await successTcs.Task);
- }
-
- [Test]
public async Task AsyncUnaryCall_EchoMetadata()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) =>
diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
index 1d9475a8b8..775c950c8c 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
@@ -40,7 +40,7 @@ namespace Grpc.Core.Internal.Tests
public void CreateAsyncAndShutdown()
{
var env = GrpcEnvironment.AddRef();
- var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env));
+ var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env, () => BatchContextSafeHandle.Create()));
cq.Shutdown();
var ev = cq.Next();
cq.Dispose();
diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs
new file mode 100644
index 0000000000..b6bb0a9eae
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs
@@ -0,0 +1,79 @@
+#region Copyright notice and license
+
+// Copyright 2017 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class DefaultObjectPoolTest
+ {
+ [Test]
+ [TestCase(10, 2)]
+ [TestCase(10, 1)]
+ [TestCase(0, 2)]
+ [TestCase(2, 0)]
+ public void ObjectIsReused(int sharedCapacity, int threadLocalCapacity)
+ {
+ var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), sharedCapacity, threadLocalCapacity);
+ var origLeased = pool.Lease();
+ pool.Return(origLeased);
+ Assert.AreSame(origLeased, pool.Lease());
+ Assert.AreNotSame(origLeased, pool.Lease());
+ }
+
+ [Test]
+ public void ZeroCapacities()
+ {
+ var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 0, 0);
+ var origLeased = pool.Lease();
+ pool.Return(origLeased);
+ Assert.AreNotSame(origLeased, pool.Lease());
+ }
+
+ [Test]
+ public void DisposeCleansSharedPool()
+ {
+ var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, 0);
+ var origLeased = pool.Lease();
+ pool.Return(origLeased);
+ pool.Dispose();
+ Assert.AreNotSame(origLeased, pool.Lease());
+ }
+
+ [Test]
+ public void Constructor()
+ {
+ Assert.Throws<ArgumentNullException>(() => new DefaultObjectPool<TestPooledObject>(null, 10, 2));
+ Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), -1, 10));
+ Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, -1));
+ }
+
+ class TestPooledObject : IDisposable
+ {
+
+ public void Dispose()
+ {
+
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 80031cb7ef..2b1b5e32d7 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -33,6 +33,8 @@ namespace Grpc.Core
public class GrpcEnvironment
{
const int MinDefaultThreadPoolSize = 4;
+ const int DefaultBatchContextPoolSharedCapacity = 10000;
+ const int DefaultBatchContextPoolThreadLocalCapacity = 64;
static object staticLock = new object();
static GrpcEnvironment instance;
@@ -40,11 +42,14 @@ namespace Grpc.Core
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
static bool inlineHandlers;
+ static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity;
+ static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity;
static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
static readonly HashSet<Server> registeredServers = new HashSet<Server>();
static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true);
+ readonly IObjectPool<BatchContextSafeHandle> batchContextPool;
readonly GrpcThreadPool threadPool;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
@@ -186,7 +191,7 @@ namespace Grpc.Core
/// <summary>
/// Sets the number of threads in the gRPC thread pool that polls for internal RPC events.
- /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
+ /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing.
/// Most users should rely on the default value provided by gRPC library.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.
@@ -203,7 +208,7 @@ namespace Grpc.Core
/// <summary>
/// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events.
- /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
+ /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
/// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
/// Most users should rely on the default value provided by gRPC library.
/// Note: this method is part of an experimental API that can change or be removed without any prior notice.
@@ -238,6 +243,26 @@ namespace Grpc.Core
}
/// <summary>
+ /// Sets the parameters for a pool that caches batch context instances. Reusing batch context instances
+ /// instead of creating a new one for every C core operation helps reducing the GC pressure.
+ /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
+ /// This is an advanced setting and you should only use it if you know what you are doing.
+ /// Most users should rely on the default value provided by gRPC library.
+ /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+ /// </summary>
+ public static void SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+ GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number");
+ GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number");
+ batchContextPoolSharedCapacity = sharedCapacity;
+ batchContextPoolThreadLocalCapacity = threadLocalCapacity;
+ }
+ }
+
+ /// <summary>
/// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic.
/// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first).
/// </summary>
@@ -249,6 +274,7 @@ namespace Grpc.Core
private GrpcEnvironment()
{
GrpcNativeInit();
+ batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity);
threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
threadPool.Start();
}
@@ -264,6 +290,8 @@ namespace Grpc.Core
}
}
+ internal IObjectPool<BatchContextSafeHandle> BatchContextPool => batchContextPool;
+
internal bool IsAlive
{
get
@@ -325,6 +353,7 @@ namespace Grpc.Core
await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false);
await threadPool.StopAsync().ConfigureAwait(false);
+ batchContextPool.Dispose();
GrpcNativeShutdown();
isShutdown = true;
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
index 42bfbb87e0..3751d549e3 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
@@ -41,6 +41,13 @@ namespace Grpc.Core
/// (<c>MoveNext</c> will return <c>false</c>) and the <c>CancellationToken</c>
/// associated with the call will be cancelled to signal the failure.
/// </para>
+ /// <para>
+ /// <c>MoveNext()</c> operations can be cancelled via a cancellation token. Cancelling
+ /// an individual read operation has the same effect as cancelling the entire call
+ /// (which will also result in the read operation returning prematurely), but the per-read cancellation
+ /// tokens passed to MoveNext() only result in cancelling the call if the read operation haven't finished
+ /// yet.
+ /// </para>
/// </summary>
/// <typeparam name="T">The message type.</typeparam>
public interface IAsyncStreamReader<T> : IAsyncEnumerator<T>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index aa2161267a..9946d1a6cf 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -92,23 +92,28 @@ namespace Grpc.Core.Internal
}
using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
- using (var ctx = BatchContextSafeHandle.Create())
{
- call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
-
- var ev = cq.Pluck(ctx.Handle);
-
- bool success = (ev.success != 0);
+ var ctx = details.Channel.Environment.BatchContextPool.Lease();
try
{
- using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
+ call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags);
+ var ev = cq.Pluck(ctx.Handle);
+ bool success = (ev.success != 0);
+ try
+ {
+ using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
+ {
+ HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
+ }
+ }
+ catch (Exception e)
{
- HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
+ Logger.Error(e, "Exception occured while invoking completion delegate.");
}
}
- catch (Exception e)
+ finally
{
- Logger.Error(e, "Exception occured while invoking completion delegate.");
+ ctx.Recycle();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 1e6f1fba37..83385ad7d3 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -38,15 +38,18 @@ namespace Grpc.Core.Internal
static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>();
+ IObjectPool<BatchContextSafeHandle> ownedByPool;
CompletionCallbackData completionCallbackData;
private BatchContextSafeHandle()
{
}
- public static BatchContextSafeHandle Create()
+ public static BatchContextSafeHandle Create(IObjectPool<BatchContextSafeHandle> ownedByPool = null)
{
- return Native.grpcsharp_batch_context_create();
+ var ctx = Native.grpcsharp_batch_context_create();
+ ctx.ownedByPool = ownedByPool;
+ return ctx;
}
public IntPtr Handle
@@ -104,6 +107,19 @@ namespace Grpc.Core.Internal
return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0;
}
+ public void Recycle()
+ {
+ if (ownedByPool != null)
+ {
+ Native.grpcsharp_batch_context_reset(this);
+ ownedByPool.Return(this);
+ }
+ else
+ {
+ Dispose();
+ }
+ }
+
protected override bool ReleaseHandle()
{
Native.grpcsharp_batch_context_destroy(handle);
@@ -123,7 +139,7 @@ namespace Grpc.Core.Internal
finally
{
completionCallbackData = default(CompletionCallbackData);
- Dispose();
+ Recycle();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index d6a5ba586b..a3ef3e61ee 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -70,8 +70,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback);
Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags)
.CheckOk();
}
@@ -87,8 +86,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback);
Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray, callFlags).CheckOk();
}
}
@@ -97,8 +95,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback);
Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags).CheckOk();
}
}
@@ -107,8 +104,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback);
Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, callFlags).CheckOk();
}
}
@@ -117,8 +113,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk();
}
}
@@ -127,8 +122,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk();
}
}
@@ -138,9 +132,8 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendStatusFromServerCompletionCallback, callback);
var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback);
var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail);
Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0,
optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
@@ -151,8 +144,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedMessageCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedMessageCallback, callback);
Native.grpcsharp_call_recv_message(this, ctx).CheckOk();
}
}
@@ -161,8 +153,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedResponseHeadersCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedResponseHeadersCallback, callback);
Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk();
}
}
@@ -171,8 +162,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedCloseOnServerCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedCloseOnServerCallback, callback);
Native.grpcsharp_call_start_serverside(this, ctx).CheckOk();
}
}
@@ -181,8 +171,7 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback);
Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 1eeb0e3d97..cd5f8ed92e 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -66,8 +66,7 @@ namespace Grpc.Core.Internal
public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback, object callbackState)
{
- var ctx = BatchContextSafeHandle.Create();
- cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback, callbackState);
+ var ctx = cq.CompletionRegistry.RegisterBatchCompletion(callback, callbackState);
Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx);
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index 851b6ca213..ab649ee766 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -49,19 +49,19 @@ namespace Grpc.Core.Internal
public async Task<bool> MoveNext(CancellationToken token)
{
- if (token != CancellationToken.None)
+ var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null;
+ using (cancellationTokenRegistration)
{
- throw new InvalidOperationException("Cancellation of individual reads is not supported.");
- }
- var result = await call.ReadMessageAsync().ConfigureAwait(false);
- this.current = result;
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
+ this.current = result;
- if (result == null)
- {
- await call.StreamingResponseCallFinishedTask.ConfigureAwait(false);
- return false;
+ if (result == null)
+ {
+ await call.StreamingResponseCallFinishedTask.ConfigureAwait(false);
+ return false;
+ }
+ return true;
}
- return true;
}
public void Dispose()
diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
index b68655b33c..cf3f3c0995 100644
--- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
+++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs
@@ -36,13 +36,15 @@ namespace Grpc.Core.Internal
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>();
readonly GrpcEnvironment environment;
+ readonly Func<BatchContextSafeHandle> batchContextFactory;
readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer());
SpinLock spinLock = new SpinLock(Debugger.IsAttached);
IntPtr lastRegisteredKey; // only for testing
- public CompletionRegistry(GrpcEnvironment environment)
+ public CompletionRegistry(GrpcEnvironment environment, Func<BatchContextSafeHandle> batchContextFactory)
{
- this.environment = environment;
+ this.environment = GrpcPreconditions.CheckNotNull(environment);
+ this.batchContextFactory = GrpcPreconditions.CheckNotNull(batchContextFactory);
}
public void Register(IntPtr key, IOpCompletionCallback callback)
@@ -63,10 +65,12 @@ namespace Grpc.Core.Internal
}
}
- public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback, object state)
+ public BatchContextSafeHandle RegisterBatchCompletion(BatchCompletionDelegate callback, object state)
{
+ var ctx = batchContextFactory();
ctx.SetCompletionCallback(callback, state);
Register(ctx.Handle, ctx);
+ return ctx;
}
public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback)
diff --git a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs
new file mode 100644
index 0000000000..2f030f3e02
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs
@@ -0,0 +1,196 @@
+#region Copyright notice and license
+
+// Copyright 2017 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Pool of objects that combines a shared pool and a thread local pool.
+ /// </summary>
+ internal class DefaultObjectPool<T> : IObjectPool<T>
+ where T : class, IDisposable
+ {
+ readonly object myLock = new object();
+ readonly Func<T> itemFactory;
+
+ // Queue shared between threads, access needs to be synchronized.
+ readonly Queue<T> sharedQueue;
+ readonly int sharedCapacity;
+
+ readonly ThreadLocal<ThreadLocalData> threadLocalData;
+ readonly int threadLocalCapacity;
+ readonly int rentLimit;
+
+ bool disposed;
+
+ /// <summary>
+ /// Initializes a new instance of <c>DefaultObjectPool</c> with given shared capacity and thread local capacity.
+ /// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately
+ /// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected
+ /// after the thread that owns them has finished).
+ /// On average, the shared pool will only be accessed approx. once for every <c>threadLocalCapacity / 2</c> rent or lease
+ /// operations.
+ /// </summary>
+ public DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity)
+ {
+ GrpcPreconditions.CheckArgument(sharedCapacity >= 0);
+ GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0);
+ this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory));
+ this.sharedQueue = new Queue<T>(sharedCapacity);
+ this.sharedCapacity = sharedCapacity;
+ this.threadLocalData = new ThreadLocal<ThreadLocalData>(() => new ThreadLocalData(threadLocalCapacity), false);
+ this.threadLocalCapacity = threadLocalCapacity;
+ this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1;
+ }
+
+ /// <summary>
+ /// Leases an item from the pool or creates a new instance if the pool is empty.
+ /// Attempts to retrieve the item from the thread local pool first.
+ /// If the thread local pool is empty, the item is taken from the shared pool
+ /// along with more items that are moved to the thread local pool to avoid
+ /// prevent acquiring the lock for shared pool too often.
+ /// The methods should not be called after the pool is disposed, but it won't
+ /// results in an error to do so (after depleting the items potentially left
+ /// in the thread local pool, it will continue returning new objects created by the factory).
+ /// </summary>
+ public T Lease()
+ {
+ var localData = threadLocalData.Value;
+ if (localData.Queue.Count > 0)
+ {
+ return localData.Queue.Dequeue();
+ }
+ if (localData.CreateBudget > 0)
+ {
+ localData.CreateBudget --;
+ return itemFactory();
+ }
+
+ int itemsMoved = 0;
+ T leasedItem = null;
+ lock(myLock)
+ {
+ if (sharedQueue.Count > 0)
+ {
+ leasedItem = sharedQueue.Dequeue();
+ }
+ while (sharedQueue.Count > 0 && itemsMoved < rentLimit)
+ {
+ localData.Queue.Enqueue(sharedQueue.Dequeue());
+ itemsMoved ++;
+ }
+ }
+
+ // If the shared pool didn't contain all rentLimit items,
+ // next time we try to lease we will just create those
+ // instead of trying to grab them from the shared queue.
+ // This is to guarantee we won't be accessing the shared queue too often.
+ localData.CreateBudget = rentLimit - itemsMoved;
+
+ return leasedItem ?? itemFactory();
+ }
+
+ /// <summary>
+ /// Returns an item to the pool.
+ /// Attempts to add the item to the thread local pool first.
+ /// If the thread local pool is full, item is added to a shared pool,
+ /// along with half of the items for the thread local pool, which
+ /// should prevent acquiring the lock for shared pool too often.
+ /// If called after the pool is disposed, we make best effort not to
+ /// add anything to the thread local pool and we guarantee not to add
+ /// anything to the shared pool (items will be disposed instead).
+ /// </summary>
+ public void Return(T item)
+ {
+ GrpcPreconditions.CheckNotNull(item);
+
+ var localData = threadLocalData.Value;
+ if (localData.Queue.Count < threadLocalCapacity && !disposed)
+ {
+ localData.Queue.Enqueue(item);
+ return;
+ }
+ if (localData.DisposeBudget > 0)
+ {
+ localData.DisposeBudget --;
+ item.Dispose();
+ return;
+ }
+
+ int itemsReturned = 0;
+ int returnLimit = rentLimit + 1;
+ lock (myLock)
+ {
+ if (sharedQueue.Count < sharedCapacity && !disposed)
+ {
+ sharedQueue.Enqueue(item);
+ itemsReturned ++;
+ }
+ while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed)
+ {
+ sharedQueue.Enqueue(localData.Queue.Dequeue());
+ itemsReturned ++;
+ }
+ }
+
+ // If the shared pool could not accomodate all returnLimit items,
+ // next time we try to return we will just dispose the item
+ // instead of trying to return them to the shared queue.
+ // This is to guarantee we won't be accessing the shared queue too often.
+ localData.DisposeBudget = returnLimit - itemsReturned;
+
+ if (itemsReturned == 0)
+ {
+ localData.DisposeBudget --;
+ item.Dispose();
+ }
+ }
+
+ public void Dispose()
+ {
+ lock (myLock)
+ {
+ if (!disposed)
+ {
+ disposed = true;
+
+ while (sharedQueue.Count > 0)
+ {
+ sharedQueue.Dequeue().Dispose();
+ }
+ }
+ }
+ }
+
+ class ThreadLocalData
+ {
+ public ThreadLocalData(int capacity)
+ {
+ this.Queue = new Queue<T>(capacity);
+ }
+
+ public Queue<T> Queue { get; }
+ public int CreateBudget { get; set; }
+ public int DisposeBudget { get; set; }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index bd0229a9dd..f1b5a4f9ff 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -219,7 +219,7 @@ namespace Grpc.Core.Internal
var list = new List<CompletionQueueSafeHandle>();
for (int i = 0; i < completionQueueCount; i++)
{
- var completionRegistry = new CompletionRegistry(environment);
+ var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease());
list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry));
}
return list.AsReadOnly();
diff --git a/src/csharp/Grpc.Core/Internal/IObjectPool.cs b/src/csharp/Grpc.Core/Internal/IObjectPool.cs
new file mode 100644
index 0000000000..f7d6e30a2a
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/IObjectPool.cs
@@ -0,0 +1,35 @@
+#region Copyright notice and license
+
+// Copyright 2017 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Pool of objects.
+ /// </summary>
+ internal interface IObjectPool<T> : IDisposable
+ where T : class
+ {
+ T Lease();
+ void Return(T item);
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index d517252cfe..43acb8f915 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -52,6 +52,7 @@ namespace Grpc.Core.Internal
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details;
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
public readonly Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate grpcsharp_batch_context_recv_close_on_server_cancelled;
+ public readonly Delegates.grpcsharp_batch_context_reset_delegate grpcsharp_batch_context_reset;
public readonly Delegates.grpcsharp_batch_context_destroy_delegate grpcsharp_batch_context_destroy;
public readonly Delegates.grpcsharp_request_call_context_create_delegate grpcsharp_request_call_context_create;
@@ -169,6 +170,7 @@ namespace Grpc.Core.Internal
this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate>(library);
this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate>(library);
this.grpcsharp_batch_context_recv_close_on_server_cancelled = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate>(library);
+ this.grpcsharp_batch_context_reset = GetMethodDelegate<Delegates.grpcsharp_batch_context_reset_delegate>(library);
this.grpcsharp_batch_context_destroy = GetMethodDelegate<Delegates.grpcsharp_batch_context_destroy_delegate>(library);
this.grpcsharp_request_call_context_create = GetMethodDelegate<Delegates.grpcsharp_request_call_context_create_delegate>(library);
@@ -311,6 +313,7 @@ namespace Grpc.Core.Internal
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx, out UIntPtr detailsLength);
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx);
public delegate int grpcsharp_batch_context_recv_close_on_server_cancelled_delegate(BatchContextSafeHandle ctx);
+ public delegate void grpcsharp_batch_context_reset_delegate(BatchContextSafeHandle ctx);
public delegate void grpcsharp_batch_context_destroy_delegate(IntPtr ctx);
public delegate RequestCallContextSafeHandle grpcsharp_request_call_context_create_delegate();
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index c65b960afb..058dddb7eb 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -49,13 +49,14 @@ namespace Grpc.Core.Internal
public async Task<bool> MoveNext(CancellationToken token)
{
- if (token != CancellationToken.None)
+
+ var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null;
+ using (cancellationTokenRegistration)
{
- throw new InvalidOperationException("Cancellation of individual reads is not supported.");
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
+ this.current = result;
+ return result != null;
}
- var result = await call.ReadMessageAsync().ConfigureAwait(false);
- this.current = result;
- return result != null;
}
public void Dispose()
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index a308890cde..9b7ea884dd 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -64,10 +64,9 @@ namespace Grpc.Core.Internal
{
using (completionQueue.NewScope())
{
- var ctx = BatchContextSafeHandle.Create();
// TODO(jtattermusch): delegate allocation by caller can be avoided by utilizing the "state" object,
// but server shutdown isn't worth optimizing right now.
- completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback, null);
+ var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(callback, null);
Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx);
}
}
diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include
index b9ceaf8254..2d9e4ba16a 100755
--- a/src/csharp/Grpc.Core/Version.csproj.include
+++ b/src/csharp/Grpc.Core/Version.csproj.include
@@ -1,7 +1,7 @@
<!-- This file is generated -->
<Project>
<PropertyGroup>
- <GrpcCsharpVersion>1.8.0-dev</GrpcCsharpVersion>
+ <GrpcCsharpVersion>1.9.0-dev</GrpcCsharpVersion>
<GoogleProtobufVersion>3.3.0</GoogleProtobufVersion>
</PropertyGroup>
</Project>
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index dab938821f..9b5da1c947 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -33,11 +33,11 @@ namespace Grpc.Core
/// <summary>
/// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies
/// </summary>
- public const string CurrentAssemblyFileVersion = "1.8.0.0";
+ public const string CurrentAssemblyFileVersion = "1.9.0.0";
/// <summary>
/// Current version of gRPC C#
/// </summary>
- public const string CurrentVersion = "1.8.0-dev";
+ public const string CurrentVersion = "1.9.0-dev";
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index 48905a2715..9d41d34414 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -131,7 +131,7 @@ namespace Grpc.IntegrationTesting
readonly List<Task> runnerTasks;
readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
- readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
+ readonly TimeStats timeStats = new TimeStats();
readonly AtomicCounter statsResetCount = new AtomicCounter();
public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams, Func<BasicProfiler> profilerFactory)
@@ -165,7 +165,7 @@ namespace Grpc.IntegrationTesting
hist.GetSnapshot(histogramData, reset);
}
- var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
+ var timeSnapshot = timeStats.GetSnapshot(reset);
if (reset)
{
@@ -173,15 +173,14 @@ namespace Grpc.IntegrationTesting
}
GrpcEnvironment.Logger.Info("[ClientRunnerImpl.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, (histogram reset count:{3}, seconds since reset: {4})",
- GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), statsResetCount.Count, secondsElapsed);
+ GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), statsResetCount.Count, timeSnapshot.WallClockTime.TotalSeconds);
- // TODO: populate user time and system time
return new ClientStats
{
Latencies = histogramData,
- TimeElapsed = secondsElapsed,
- TimeUser = 0,
- TimeSystem = 0
+ TimeElapsed = timeSnapshot.WallClockTime.TotalSeconds,
+ TimeUser = timeSnapshot.UserProcessorTime.TotalSeconds,
+ TimeSystem = timeSnapshot.PrivilegedProcessorTime.TotalSeconds
};
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
index e1b47744d5..ea29bd74e5 100644
--- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
@@ -117,7 +117,7 @@ namespace Grpc.IntegrationTesting
public class ServerRunnerImpl : IServerRunner
{
readonly Server server;
- readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
+ readonly TimeStats timeStats = new TimeStats();
public ServerRunnerImpl(Server server)
{
@@ -138,17 +138,16 @@ namespace Grpc.IntegrationTesting
/// <returns>The stats.</returns>
public ServerStats GetStats(bool reset)
{
- var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
+ var timeSnapshot = timeStats.GetSnapshot(reset);
GrpcEnvironment.Logger.Info("[ServerRunner.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, (seconds since last reset {3})",
- GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), secondsElapsed);
+ GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), timeSnapshot.WallClockTime.TotalSeconds);
- // TODO: populate user time and system time
return new ServerStats
{
- TimeElapsed = secondsElapsed,
- TimeUser = 0,
- TimeSystem = 0
+ TimeElapsed = timeSnapshot.WallClockTime.TotalSeconds,
+ TimeUser = timeSnapshot.UserProcessorTime.TotalSeconds,
+ TimeSystem = timeSnapshot.PrivilegedProcessorTime.TotalSeconds
};
}
diff --git a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
index 11956e4ac8..0c62380768 100644
--- a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
@@ -243,7 +243,7 @@ namespace Grpc.IntegrationTesting
const string GaugeName = "csharp_overall_qps";
readonly Histogram histogram;
- readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
+ readonly TimeStats timeStats = new TimeStats();
public MetricsServiceImpl(Histogram histogram)
{
@@ -280,9 +280,9 @@ namespace Grpc.IntegrationTesting
long GetQpsAndReset()
{
var snapshot = histogram.GetSnapshot(true);
- var elapsedSnapshot = wallClockStopwatch.GetElapsedSnapshot(true);
+ var timeSnapshot = timeStats.GetSnapshot(true);
- return (long) (snapshot.Count / elapsedSnapshot.TotalSeconds);
+ return (long) (snapshot.Count / timeSnapshot.WallClockTime.TotalSeconds);
}
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/TimeStats.cs b/src/csharp/Grpc.IntegrationTesting/TimeStats.cs
new file mode 100644
index 0000000000..6aba04c194
--- /dev/null
+++ b/src/csharp/Grpc.IntegrationTesting/TimeStats.cs
@@ -0,0 +1,90 @@
+#region Copyright notice and license
+
+// Copyright 2015 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Text.RegularExpressions;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Grpc.Core;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+using Grpc.Testing;
+
+namespace Grpc.IntegrationTesting
+{
+ /// <summary>
+ /// Snapshottable time statistics.
+ /// </summary>
+ public class TimeStats
+ {
+ readonly object myLock = new object();
+ DateTime lastWallClock;
+ TimeSpan lastUserTime;
+ TimeSpan lastPrivilegedTime;
+
+ public TimeStats()
+ {
+ lastWallClock = DateTime.UtcNow;
+ lastUserTime = Process.GetCurrentProcess().UserProcessorTime;
+ lastPrivilegedTime = Process.GetCurrentProcess().PrivilegedProcessorTime;
+ }
+
+ public Snapshot GetSnapshot(bool reset)
+ {
+ lock (myLock)
+ {
+ var wallClock = DateTime.UtcNow;
+ var userTime = Process.GetCurrentProcess().UserProcessorTime;
+ var privilegedTime = Process.GetCurrentProcess().PrivilegedProcessorTime;
+ var snapshot = new Snapshot(wallClock - lastWallClock, userTime - lastUserTime, privilegedTime - lastPrivilegedTime);
+
+ if (reset)
+ {
+ lastWallClock = wallClock;
+ lastUserTime = userTime;
+ lastPrivilegedTime = privilegedTime;
+ }
+ return snapshot;
+ }
+ }
+
+ public class Snapshot
+ {
+ public TimeSpan WallClockTime { get; }
+ public TimeSpan UserProcessorTime { get; }
+ public TimeSpan PrivilegedProcessorTime { get; }
+
+ public Snapshot(TimeSpan wallClockTime, TimeSpan userProcessorTime, TimeSpan privilegedProcessorTime)
+ {
+ this.WallClockTime = wallClockTime;
+ this.UserProcessorTime = userProcessorTime;
+ this.PrivilegedProcessorTime = privilegedProcessorTime;
+ }
+
+ public override string ToString()
+ {
+ return string.Format("[TimeStats.Snapshot: wallClock {0}, userProcessor {1}, privilegedProcessor {2}]", WallClockTime, UserProcessorTime, PrivilegedProcessorTime);
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.IntegrationTesting/WallClockStopwatch.cs b/src/csharp/Grpc.IntegrationTesting/WallClockStopwatch.cs
deleted file mode 100644
index 38b58f296c..0000000000
--- a/src/csharp/Grpc.IntegrationTesting/WallClockStopwatch.cs
+++ /dev/null
@@ -1,63 +0,0 @@
-#region Copyright notice and license
-
-// Copyright 2015 gRPC authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#endregion
-
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.IO;
-using System.Linq;
-using System.Text.RegularExpressions;
-using System.Threading;
-using System.Threading.Tasks;
-using Google.Protobuf;
-using Grpc.Core;
-using Grpc.Core.Utils;
-using NUnit.Framework;
-using Grpc.Testing;
-
-namespace Grpc.IntegrationTesting
-{
- /// <summary>
- /// Snapshottable wall clock stopwatch.
- /// </summary>
- public class WallClockStopwatch
- {
- long startTicks;
-
- public WallClockStopwatch()
- {
- this.startTicks = DateTime.UtcNow.Ticks;
- }
-
- public TimeSpan GetElapsedSnapshot(bool reset)
- {
- var utcNow = DateTime.UtcNow;
-
- long oldStartTicks;
- if (reset)
- {
- oldStartTicks = Interlocked.Exchange(ref this.startTicks, utcNow.Ticks);
- }
- else
- {
- oldStartTicks = this.startTicks;
- }
- return utcNow - new DateTime(oldStartTicks, DateTimeKind.Utc);
- }
- }
-}
diff --git a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs
index 2d1c33e9a0..eefdb50e39 100644
--- a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs
+++ b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs
@@ -43,7 +43,7 @@ namespace Grpc.Microbenchmarks
public void Run(int threadCount, int iterations, bool useSharedRegistry)
{
Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry));
- CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment) : null;
+ CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()) : null;
var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry));
threadedBenchmark.Run();
// TODO: parametrize by number of pending completions
@@ -51,7 +51,7 @@ namespace Grpc.Microbenchmarks
private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry)
{
- var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment);
+ var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create());
var ctx = BatchContextSafeHandle.Create();
var stopwatch = Stopwatch.StartNew();
@@ -64,7 +64,7 @@ namespace Grpc.Microbenchmarks
stopwatch.Stop();
Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds);
- ctx.Dispose();
+ ctx.Recycle();
}
private class NopCompletionCallback : IOpCompletionCallback
diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
index 9cff97eb88..da4f35ff96 100644
--- a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
+++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs
@@ -52,10 +52,7 @@ namespace Grpc.Microbenchmarks
private void ThreadBody(int iterations, int payloadSize)
{
- // TODO(jtattermusch): parametrize by number of pending completions.
- // TODO(jtattermusch): parametrize by cached/non-cached BatchContextSafeHandle
-
- var completionRegistry = new CompletionRegistry(environment);
+ var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease());
var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry);
var call = CreateFakeCall(cq);
diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat
index ff013d5680..8f89e2846a 100755
--- a/src/csharp/build_packages_dotnetcli.bat
+++ b/src/csharp/build_packages_dotnetcli.bat
@@ -13,7 +13,7 @@
@rem limitations under the License.
@rem Current package versions
-set VERSION=1.8.0-dev
+set VERSION=1.9.0-dev
@rem Adjust the location of nuget.exe
set NUGET=C:\nuget\nuget.exe
diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh
index 44a4791146..6a6cafe2bd 100755
--- a/src/csharp/build_packages_dotnetcli.sh
+++ b/src/csharp/build_packages_dotnetcli.sh
@@ -39,7 +39,7 @@ dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts
dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts
dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts
-nuget pack Grpc.nuspec -Version "1.8.0-dev" -OutputDirectory ../../artifacts
-nuget pack Grpc.Tools.nuspec -Version "1.8.0-dev" -OutputDirectory ../../artifacts
+nuget pack Grpc.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts
+nuget pack Grpc.Tools.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts
(cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg)
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index bcb3bfaee5..24d779e1e5 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -197,10 +197,7 @@ void grpcsharp_metadata_array_move(grpc_metadata_array* dest,
}
GPR_EXPORT void GPR_CALLTYPE
-grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) {
- if (!ctx) {
- return;
- }
+grpcsharp_batch_context_reset(grpcsharp_batch_context* ctx) {
grpcsharp_metadata_array_destroy_metadata_including_entries(
&(ctx->send_initial_metadata));
@@ -216,7 +213,15 @@ grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) {
grpcsharp_metadata_array_destroy_metadata_only(
&(ctx->recv_status_on_client.trailing_metadata));
grpc_slice_unref(ctx->recv_status_on_client.status_details);
+ memset(ctx, 0, sizeof(grpcsharp_batch_context));
+}
+GPR_EXPORT void GPR_CALLTYPE
+grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) {
+ if (!ctx) {
+ return;
+ }
+ grpcsharp_batch_context_reset(ctx);
gpr_free(ctx);
}
diff --git a/src/csharp/tests.json b/src/csharp/tests.json
index 7841051052..82573edecb 100644
--- a/src/csharp/tests.json
+++ b/src/csharp/tests.json
@@ -5,11 +5,13 @@
"Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
"Grpc.Core.Internal.Tests.CompletionQueueEventTest",
"Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest",
+ "Grpc.Core.Internal.Tests.DefaultObjectPoolTest",
"Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest",
"Grpc.Core.Internal.Tests.TimespecTest",
"Grpc.Core.Tests.AppDomainUnloadTest",
"Grpc.Core.Tests.AuthContextTest",
"Grpc.Core.Tests.AuthPropertyTest",
+ "Grpc.Core.Tests.CallCancellationTest",
"Grpc.Core.Tests.CallCredentialsTest",
"Grpc.Core.Tests.CallOptionsTest",
"Grpc.Core.Tests.ChannelCredentialsTest",