diff options
author | David Garcia Quintas <dgq@google.com> | 2017-12-04 16:25:01 -0800 |
---|---|---|
committer | David Garcia Quintas <dgq@google.com> | 2017-12-04 16:25:01 -0800 |
commit | 6667f15d8e4ea915bd64647e2c7949d37d1bad91 (patch) | |
tree | de1f6de5029d7c51ee369efa5294e00736014597 /src/csharp | |
parent | 0f91e513d9dc9bee529701ba254933eb7be07b38 (diff) | |
parent | 6fa206de8f8a1444fff19a84945a424c0cabb41c (diff) |
Merge branch 'master' of github.com:grpc/grpc into backoff_cpp
Diffstat (limited to 'src/csharp')
31 files changed, 731 insertions, 226 deletions
diff --git a/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs b/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs new file mode 100644 index 0000000000..e040f52380 --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/CallCancellationTest.cs @@ -0,0 +1,182 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Profiling; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class CallCancellationTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + + [Test] + public async Task ClientStreamingCall_CancelAfterBegin() + { + var barrier = new TaskCompletionSource<object>(); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + barrier.SetResult(null); + await requestStream.ToListAsync(); + return ""; + }); + + var cts = new CancellationTokenSource(); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); + + await barrier.Task; // make sure the handler has started. + cts.Cancel(); + + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + } + + [Test] + public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull() + { + var handlerStartedBarrier = new TaskCompletionSource<object>(); + var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>(); + var successTcs = new TaskCompletionSource<string>(); + + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + handlerStartedBarrier.SetResult(null); + + // wait for cancellation to be delivered. + context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null)); + await cancelNotificationReceivedBarrier.Task; + + var moveNextResult = await requestStream.MoveNext(); + successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL"); + return ""; + }); + + var cts = new CancellationTokenSource(); + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); + + await handlerStartedBarrier.Task; + cts.Cancel(); + + try + { + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + Assert.AreEqual("SUCCESS", await successTcs.Task); + } + + [Test] + public async Task ClientStreamingCall_CancelServerSideRead() + { + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + var cts = new CancellationTokenSource(); + var moveNextTask = requestStream.MoveNext(cts.Token); + cts.Cancel(); + await moveNextTask; + return ""; + }); + + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + } + + [Test] + public async Task ServerStreamingCall_CancelClientSideRead() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => + { + await responseStream.WriteAsync("abc"); + while (!context.CancellationToken.IsCancellationRequested) + { + await Task.Delay(10); + } + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + await call.ResponseStream.MoveNext(); + Assert.AreEqual("abc", call.ResponseStream.Current); + + var cts = new CancellationTokenSource(); + var moveNextTask = call.ResponseStream.MoveNext(cts.Token); + cts.Cancel(); + + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await moveNextTask; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index 72d9035a6f..90dd365b07 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -273,74 +273,6 @@ namespace Grpc.Core.Tests } [Test] - public async Task ClientStreamingCall_CancelAfterBegin() - { - var barrier = new TaskCompletionSource<object>(); - - helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => - { - barrier.SetResult(null); - await requestStream.ToListAsync(); - return ""; - }); - - var cts = new CancellationTokenSource(); - var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); - - await barrier.Task; // make sure the handler has started. - cts.Cancel(); - - try - { - // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. - await call.ResponseAsync; - Assert.Fail(); - } - catch (RpcException ex) - { - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); - } - } - - [Test] - public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull() - { - var handlerStartedBarrier = new TaskCompletionSource<object>(); - var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>(); - var successTcs = new TaskCompletionSource<string>(); - - helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => - { - handlerStartedBarrier.SetResult(null); - - // wait for cancellation to be delivered. - context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null)); - await cancelNotificationReceivedBarrier.Task; - - var moveNextResult = await requestStream.MoveNext(); - successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL"); - return ""; - }); - - var cts = new CancellationTokenSource(); - var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); - - await handlerStartedBarrier.Task; - cts.Cancel(); - - try - { - await call.ResponseAsync; - Assert.Fail(); - } - catch (RpcException ex) - { - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); - } - Assert.AreEqual("SUCCESS", await successTcs.Task); - } - - [Test] public async Task AsyncUnaryCall_EchoMetadata() { helper.UnaryHandler = new UnaryServerMethod<string, string>((request, context) => diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs index 1d9475a8b8..775c950c8c 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs @@ -40,7 +40,7 @@ namespace Grpc.Core.Internal.Tests public void CreateAsyncAndShutdown() { var env = GrpcEnvironment.AddRef(); - var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env)); + var cq = CompletionQueueSafeHandle.CreateAsync(new CompletionRegistry(env, () => BatchContextSafeHandle.Create())); cq.Shutdown(); var ev = cq.Next(); cq.Dispose(); diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs new file mode 100644 index 0000000000..b6bb0a9eae --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultObjectPoolTest.cs @@ -0,0 +1,79 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + public class DefaultObjectPoolTest + { + [Test] + [TestCase(10, 2)] + [TestCase(10, 1)] + [TestCase(0, 2)] + [TestCase(2, 0)] + public void ObjectIsReused(int sharedCapacity, int threadLocalCapacity) + { + var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), sharedCapacity, threadLocalCapacity); + var origLeased = pool.Lease(); + pool.Return(origLeased); + Assert.AreSame(origLeased, pool.Lease()); + Assert.AreNotSame(origLeased, pool.Lease()); + } + + [Test] + public void ZeroCapacities() + { + var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 0, 0); + var origLeased = pool.Lease(); + pool.Return(origLeased); + Assert.AreNotSame(origLeased, pool.Lease()); + } + + [Test] + public void DisposeCleansSharedPool() + { + var pool = new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, 0); + var origLeased = pool.Lease(); + pool.Return(origLeased); + pool.Dispose(); + Assert.AreNotSame(origLeased, pool.Lease()); + } + + [Test] + public void Constructor() + { + Assert.Throws<ArgumentNullException>(() => new DefaultObjectPool<TestPooledObject>(null, 10, 2)); + Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), -1, 10)); + Assert.Throws<ArgumentException>(() => new DefaultObjectPool<TestPooledObject>(() => new TestPooledObject(), 10, -1)); + } + + class TestPooledObject : IDisposable + { + + public void Dispose() + { + + } + } + } +} diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 80031cb7ef..2b1b5e32d7 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -33,6 +33,8 @@ namespace Grpc.Core public class GrpcEnvironment { const int MinDefaultThreadPoolSize = 4; + const int DefaultBatchContextPoolSharedCapacity = 10000; + const int DefaultBatchContextPoolThreadLocalCapacity = 64; static object staticLock = new object(); static GrpcEnvironment instance; @@ -40,11 +42,14 @@ namespace Grpc.Core static int? customThreadPoolSize; static int? customCompletionQueueCount; static bool inlineHandlers; + static int batchContextPoolSharedCapacity = DefaultBatchContextPoolSharedCapacity; + static int batchContextPoolThreadLocalCapacity = DefaultBatchContextPoolThreadLocalCapacity; static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>(); static readonly HashSet<Server> registeredServers = new HashSet<Server>(); static ILogger logger = new LogLevelFilterLogger(new ConsoleLogger(), LogLevel.Off, true); + readonly IObjectPool<BatchContextSafeHandle> batchContextPool; readonly GrpcThreadPool threadPool; readonly DebugStats debugStats = new DebugStats(); readonly AtomicCounter cqPickerCounter = new AtomicCounter(); @@ -186,7 +191,7 @@ namespace Grpc.Core /// <summary> /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events. - /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing. /// Most users should rely on the default value provided by gRPC library. /// Note: this method is part of an experimental API that can change or be removed without any prior notice. @@ -203,7 +208,7 @@ namespace Grpc.Core /// <summary> /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events. - /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing. /// Most users should rely on the default value provided by gRPC library. /// Note: this method is part of an experimental API that can change or be removed without any prior notice. @@ -238,6 +243,26 @@ namespace Grpc.Core } /// <summary> + /// Sets the parameters for a pool that caches batch context instances. Reusing batch context instances + /// instead of creating a new one for every C core operation helps reducing the GC pressure. + /// Can be only invoked before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// This is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// </summary> + public static void SetBatchContextPoolParams(int sharedCapacity, int threadLocalCapacity) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(sharedCapacity >= 0, "Shared capacity needs to be a non-negative number"); + GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0, "Thread local capacity needs to be a non-negative number"); + batchContextPoolSharedCapacity = sharedCapacity; + batchContextPoolThreadLocalCapacity = threadLocalCapacity; + } + } + + /// <summary> /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic. /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first). /// </summary> @@ -249,6 +274,7 @@ namespace Grpc.Core private GrpcEnvironment() { GrpcNativeInit(); + batchContextPool = new DefaultObjectPool<BatchContextSafeHandle>(() => BatchContextSafeHandle.Create(this.batchContextPool), batchContextPoolSharedCapacity, batchContextPoolThreadLocalCapacity); threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers); threadPool.Start(); } @@ -264,6 +290,8 @@ namespace Grpc.Core } } + internal IObjectPool<BatchContextSafeHandle> BatchContextPool => batchContextPool; + internal bool IsAlive { get @@ -325,6 +353,7 @@ namespace Grpc.Core await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false); await threadPool.StopAsync().ConfigureAwait(false); + batchContextPool.Dispose(); GrpcNativeShutdown(); isShutdown = true; diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs index 42bfbb87e0..3751d549e3 100644 --- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs +++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs @@ -41,6 +41,13 @@ namespace Grpc.Core /// (<c>MoveNext</c> will return <c>false</c>) and the <c>CancellationToken</c> /// associated with the call will be cancelled to signal the failure. /// </para> + /// <para> + /// <c>MoveNext()</c> operations can be cancelled via a cancellation token. Cancelling + /// an individual read operation has the same effect as cancelling the entire call + /// (which will also result in the read operation returning prematurely), but the per-read cancellation + /// tokens passed to MoveNext() only result in cancelling the call if the read operation haven't finished + /// yet. + /// </para> /// </summary> /// <typeparam name="T">The message type.</typeparam> public interface IAsyncStreamReader<T> : IAsyncEnumerator<T> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index aa2161267a..9946d1a6cf 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -92,23 +92,28 @@ namespace Grpc.Core.Internal } using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) - using (var ctx = BatchContextSafeHandle.Create()) { - call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); - - var ev = cq.Pluck(ctx.Handle); - - bool success = (ev.success != 0); + var ctx = details.Channel.Environment.BatchContextPool.Lease(); try { - using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + call.StartUnary(ctx, payload, GetWriteFlagsForCall(), metadataArray, details.Options.Flags); + var ev = cq.Pluck(ctx.Handle); + bool success = (ev.success != 0); + try + { + using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch")) + { + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + } + } + catch (Exception e) { - HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); + Logger.Error(e, "Exception occured while invoking completion delegate."); } } - catch (Exception e) + finally { - Logger.Error(e, "Exception occured while invoking completion delegate."); + ctx.Recycle(); } } diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 1e6f1fba37..83385ad7d3 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -38,15 +38,18 @@ namespace Grpc.Core.Internal static readonly NativeMethods Native = NativeMethods.Get(); static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>(); + IObjectPool<BatchContextSafeHandle> ownedByPool; CompletionCallbackData completionCallbackData; private BatchContextSafeHandle() { } - public static BatchContextSafeHandle Create() + public static BatchContextSafeHandle Create(IObjectPool<BatchContextSafeHandle> ownedByPool = null) { - return Native.grpcsharp_batch_context_create(); + var ctx = Native.grpcsharp_batch_context_create(); + ctx.ownedByPool = ownedByPool; + return ctx; } public IntPtr Handle @@ -104,6 +107,19 @@ namespace Grpc.Core.Internal return Native.grpcsharp_batch_context_recv_close_on_server_cancelled(this) != 0; } + public void Recycle() + { + if (ownedByPool != null) + { + Native.grpcsharp_batch_context_reset(this); + ownedByPool.Return(this); + } + else + { + Dispose(); + } + } + protected override bool ReleaseHandle() { Native.grpcsharp_batch_context_destroy(handle); @@ -123,7 +139,7 @@ namespace Grpc.Core.Internal finally { completionCallbackData = default(CompletionCallbackData); - Dispose(); + Recycle(); } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index d6a5ba586b..a3ef3e61ee 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -70,8 +70,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags) .CheckOk(); } @@ -87,8 +86,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IUnaryResponseClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IUnaryResponseClientCallback, callback); Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } @@ -97,8 +95,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, metadataArray, callFlags).CheckOk(); } } @@ -107,8 +104,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedStatusOnClientCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedStatusOnClientCallback, callback); Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray, callFlags).CheckOk(); } } @@ -117,8 +113,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata ? 1 : 0).CheckOk(); } } @@ -127,8 +122,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } } @@ -138,9 +132,8 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendStatusFromServerCompletionCallback, callback); var statusDetailBytes = MarshalUtils.GetBytesUTF8(status.Detail); Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, statusDetailBytes, new UIntPtr((ulong)statusDetailBytes.Length), metadataArray, sendEmptyInitialMetadata ? 1 : 0, optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); @@ -151,8 +144,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedMessageCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedMessageCallback, callback); Native.grpcsharp_call_recv_message(this, ctx).CheckOk(); } } @@ -161,8 +153,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedResponseHeadersCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedResponseHeadersCallback, callback); Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); } } @@ -171,8 +162,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_IReceivedCloseOnServerCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_IReceivedCloseOnServerCallback, callback); Native.grpcsharp_call_start_serverside(this, ctx).CheckOk(); } } @@ -181,8 +171,7 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, CompletionHandler_ISendCompletionCallback, callback); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(CompletionHandler_ISendCompletionCallback, callback); Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 1eeb0e3d97..cd5f8ed92e 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -66,8 +66,7 @@ namespace Grpc.Core.Internal public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback, object callbackState) { - var ctx = BatchContextSafeHandle.Create(); - cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback, callbackState); + var ctx = cq.CompletionRegistry.RegisterBatchCompletion(callback, callbackState); Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx); } diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index 851b6ca213..ab649ee766 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -49,19 +49,19 @@ namespace Grpc.Core.Internal public async Task<bool> MoveNext(CancellationToken token) { - if (token != CancellationToken.None) + var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null; + using (cancellationTokenRegistration) { - throw new InvalidOperationException("Cancellation of individual reads is not supported."); - } - var result = await call.ReadMessageAsync().ConfigureAwait(false); - this.current = result; + var result = await call.ReadMessageAsync().ConfigureAwait(false); + this.current = result; - if (result == null) - { - await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); - return false; + if (result == null) + { + await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); + return false; + } + return true; } - return true; } public void Dispose() diff --git a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs index b68655b33c..cf3f3c0995 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionRegistry.cs @@ -36,13 +36,15 @@ namespace Grpc.Core.Internal static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<CompletionRegistry>(); readonly GrpcEnvironment environment; + readonly Func<BatchContextSafeHandle> batchContextFactory; readonly Dictionary<IntPtr, IOpCompletionCallback> dict = new Dictionary<IntPtr, IOpCompletionCallback>(new IntPtrComparer()); SpinLock spinLock = new SpinLock(Debugger.IsAttached); IntPtr lastRegisteredKey; // only for testing - public CompletionRegistry(GrpcEnvironment environment) + public CompletionRegistry(GrpcEnvironment environment, Func<BatchContextSafeHandle> batchContextFactory) { - this.environment = environment; + this.environment = GrpcPreconditions.CheckNotNull(environment); + this.batchContextFactory = GrpcPreconditions.CheckNotNull(batchContextFactory); } public void Register(IntPtr key, IOpCompletionCallback callback) @@ -63,10 +65,12 @@ namespace Grpc.Core.Internal } } - public void RegisterBatchCompletion(BatchContextSafeHandle ctx, BatchCompletionDelegate callback, object state) + public BatchContextSafeHandle RegisterBatchCompletion(BatchCompletionDelegate callback, object state) { + var ctx = batchContextFactory(); ctx.SetCompletionCallback(callback, state); Register(ctx.Handle, ctx); + return ctx; } public void RegisterRequestCallCompletion(RequestCallContextSafeHandle ctx, RequestCallCompletionDelegate callback) diff --git a/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs new file mode 100644 index 0000000000..2f030f3e02 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/DefaultObjectPool.cs @@ -0,0 +1,196 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Collections.Generic; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Pool of objects that combines a shared pool and a thread local pool. + /// </summary> + internal class DefaultObjectPool<T> : IObjectPool<T> + where T : class, IDisposable + { + readonly object myLock = new object(); + readonly Func<T> itemFactory; + + // Queue shared between threads, access needs to be synchronized. + readonly Queue<T> sharedQueue; + readonly int sharedCapacity; + + readonly ThreadLocal<ThreadLocalData> threadLocalData; + readonly int threadLocalCapacity; + readonly int rentLimit; + + bool disposed; + + /// <summary> + /// Initializes a new instance of <c>DefaultObjectPool</c> with given shared capacity and thread local capacity. + /// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately + /// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected + /// after the thread that owns them has finished). + /// On average, the shared pool will only be accessed approx. once for every <c>threadLocalCapacity / 2</c> rent or lease + /// operations. + /// </summary> + public DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity) + { + GrpcPreconditions.CheckArgument(sharedCapacity >= 0); + GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0); + this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory)); + this.sharedQueue = new Queue<T>(sharedCapacity); + this.sharedCapacity = sharedCapacity; + this.threadLocalData = new ThreadLocal<ThreadLocalData>(() => new ThreadLocalData(threadLocalCapacity), false); + this.threadLocalCapacity = threadLocalCapacity; + this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1; + } + + /// <summary> + /// Leases an item from the pool or creates a new instance if the pool is empty. + /// Attempts to retrieve the item from the thread local pool first. + /// If the thread local pool is empty, the item is taken from the shared pool + /// along with more items that are moved to the thread local pool to avoid + /// prevent acquiring the lock for shared pool too often. + /// The methods should not be called after the pool is disposed, but it won't + /// results in an error to do so (after depleting the items potentially left + /// in the thread local pool, it will continue returning new objects created by the factory). + /// </summary> + public T Lease() + { + var localData = threadLocalData.Value; + if (localData.Queue.Count > 0) + { + return localData.Queue.Dequeue(); + } + if (localData.CreateBudget > 0) + { + localData.CreateBudget --; + return itemFactory(); + } + + int itemsMoved = 0; + T leasedItem = null; + lock(myLock) + { + if (sharedQueue.Count > 0) + { + leasedItem = sharedQueue.Dequeue(); + } + while (sharedQueue.Count > 0 && itemsMoved < rentLimit) + { + localData.Queue.Enqueue(sharedQueue.Dequeue()); + itemsMoved ++; + } + } + + // If the shared pool didn't contain all rentLimit items, + // next time we try to lease we will just create those + // instead of trying to grab them from the shared queue. + // This is to guarantee we won't be accessing the shared queue too often. + localData.CreateBudget = rentLimit - itemsMoved; + + return leasedItem ?? itemFactory(); + } + + /// <summary> + /// Returns an item to the pool. + /// Attempts to add the item to the thread local pool first. + /// If the thread local pool is full, item is added to a shared pool, + /// along with half of the items for the thread local pool, which + /// should prevent acquiring the lock for shared pool too often. + /// If called after the pool is disposed, we make best effort not to + /// add anything to the thread local pool and we guarantee not to add + /// anything to the shared pool (items will be disposed instead). + /// </summary> + public void Return(T item) + { + GrpcPreconditions.CheckNotNull(item); + + var localData = threadLocalData.Value; + if (localData.Queue.Count < threadLocalCapacity && !disposed) + { + localData.Queue.Enqueue(item); + return; + } + if (localData.DisposeBudget > 0) + { + localData.DisposeBudget --; + item.Dispose(); + return; + } + + int itemsReturned = 0; + int returnLimit = rentLimit + 1; + lock (myLock) + { + if (sharedQueue.Count < sharedCapacity && !disposed) + { + sharedQueue.Enqueue(item); + itemsReturned ++; + } + while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed) + { + sharedQueue.Enqueue(localData.Queue.Dequeue()); + itemsReturned ++; + } + } + + // If the shared pool could not accomodate all returnLimit items, + // next time we try to return we will just dispose the item + // instead of trying to return them to the shared queue. + // This is to guarantee we won't be accessing the shared queue too often. + localData.DisposeBudget = returnLimit - itemsReturned; + + if (itemsReturned == 0) + { + localData.DisposeBudget --; + item.Dispose(); + } + } + + public void Dispose() + { + lock (myLock) + { + if (!disposed) + { + disposed = true; + + while (sharedQueue.Count > 0) + { + sharedQueue.Dequeue().Dispose(); + } + } + } + } + + class ThreadLocalData + { + public ThreadLocalData(int capacity) + { + this.Queue = new Queue<T>(capacity); + } + + public Queue<T> Queue { get; } + public int CreateBudget { get; set; } + public int DisposeBudget { get; set; } + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index bd0229a9dd..f1b5a4f9ff 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -219,7 +219,7 @@ namespace Grpc.Core.Internal var list = new List<CompletionQueueSafeHandle>(); for (int i = 0; i < completionQueueCount; i++) { - var completionRegistry = new CompletionRegistry(environment); + var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease()); list.Add(CompletionQueueSafeHandle.CreateAsync(completionRegistry)); } return list.AsReadOnly(); diff --git a/src/csharp/Grpc.Core/Internal/IObjectPool.cs b/src/csharp/Grpc.Core/Internal/IObjectPool.cs new file mode 100644 index 0000000000..f7d6e30a2a --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/IObjectPool.cs @@ -0,0 +1,35 @@ +#region Copyright notice and license + +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Collections.Generic; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Pool of objects. + /// </summary> + internal interface IObjectPool<T> : IDisposable + where T : class + { + T Lease(); + void Return(T item); + } +} diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index d517252cfe..43acb8f915 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -52,6 +52,7 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details; public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata; public readonly Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate grpcsharp_batch_context_recv_close_on_server_cancelled; + public readonly Delegates.grpcsharp_batch_context_reset_delegate grpcsharp_batch_context_reset; public readonly Delegates.grpcsharp_batch_context_destroy_delegate grpcsharp_batch_context_destroy; public readonly Delegates.grpcsharp_request_call_context_create_delegate grpcsharp_request_call_context_create; @@ -169,6 +170,7 @@ namespace Grpc.Core.Internal this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate>(library); this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate>(library); this.grpcsharp_batch_context_recv_close_on_server_cancelled = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_close_on_server_cancelled_delegate>(library); + this.grpcsharp_batch_context_reset = GetMethodDelegate<Delegates.grpcsharp_batch_context_reset_delegate>(library); this.grpcsharp_batch_context_destroy = GetMethodDelegate<Delegates.grpcsharp_batch_context_destroy_delegate>(library); this.grpcsharp_request_call_context_create = GetMethodDelegate<Delegates.grpcsharp_request_call_context_create_delegate>(library); @@ -311,6 +313,7 @@ namespace Grpc.Core.Internal public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx, out UIntPtr detailsLength); public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx); public delegate int grpcsharp_batch_context_recv_close_on_server_cancelled_delegate(BatchContextSafeHandle ctx); + public delegate void grpcsharp_batch_context_reset_delegate(BatchContextSafeHandle ctx); public delegate void grpcsharp_batch_context_destroy_delegate(IntPtr ctx); public delegate RequestCallContextSafeHandle grpcsharp_request_call_context_create_delegate(); diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs index c65b960afb..058dddb7eb 100644 --- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs @@ -49,13 +49,14 @@ namespace Grpc.Core.Internal public async Task<bool> MoveNext(CancellationToken token) { - if (token != CancellationToken.None) + + var cancellationTokenRegistration = token.CanBeCanceled ? token.Register(() => call.Cancel()) : (IDisposable) null; + using (cancellationTokenRegistration) { - throw new InvalidOperationException("Cancellation of individual reads is not supported."); + var result = await call.ReadMessageAsync().ConfigureAwait(false); + this.current = result; + return result != null; } - var result = await call.ReadMessageAsync().ConfigureAwait(false); - this.current = result; - return result != null; } public void Dispose() diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index a308890cde..9b7ea884dd 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -64,10 +64,9 @@ namespace Grpc.Core.Internal { using (completionQueue.NewScope()) { - var ctx = BatchContextSafeHandle.Create(); // TODO(jtattermusch): delegate allocation by caller can be avoided by utilizing the "state" object, // but server shutdown isn't worth optimizing right now. - completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback, null); + var ctx = completionQueue.CompletionRegistry.RegisterBatchCompletion(callback, null); Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx); } } diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include index b9ceaf8254..2d9e4ba16a 100755 --- a/src/csharp/Grpc.Core/Version.csproj.include +++ b/src/csharp/Grpc.Core/Version.csproj.include @@ -1,7 +1,7 @@ <!-- This file is generated --> <Project> <PropertyGroup> - <GrpcCsharpVersion>1.8.0-dev</GrpcCsharpVersion> + <GrpcCsharpVersion>1.9.0-dev</GrpcCsharpVersion> <GoogleProtobufVersion>3.3.0</GoogleProtobufVersion> </PropertyGroup> </Project> diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index dab938821f..9b5da1c947 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -33,11 +33,11 @@ namespace Grpc.Core /// <summary> /// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies /// </summary> - public const string CurrentAssemblyFileVersion = "1.8.0.0"; + public const string CurrentAssemblyFileVersion = "1.9.0.0"; /// <summary> /// Current version of gRPC C# /// </summary> - public const string CurrentVersion = "1.8.0-dev"; + public const string CurrentVersion = "1.9.0-dev"; } } diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 48905a2715..9d41d34414 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -131,7 +131,7 @@ namespace Grpc.IntegrationTesting readonly List<Task> runnerTasks; readonly CancellationTokenSource stoppedCts = new CancellationTokenSource(); - readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); + readonly TimeStats timeStats = new TimeStats(); readonly AtomicCounter statsResetCount = new AtomicCounter(); public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams, Func<BasicProfiler> profilerFactory) @@ -165,7 +165,7 @@ namespace Grpc.IntegrationTesting hist.GetSnapshot(histogramData, reset); } - var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds; + var timeSnapshot = timeStats.GetSnapshot(reset); if (reset) { @@ -173,15 +173,14 @@ namespace Grpc.IntegrationTesting } GrpcEnvironment.Logger.Info("[ClientRunnerImpl.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, (histogram reset count:{3}, seconds since reset: {4})", - GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), statsResetCount.Count, secondsElapsed); + GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), statsResetCount.Count, timeSnapshot.WallClockTime.TotalSeconds); - // TODO: populate user time and system time return new ClientStats { Latencies = histogramData, - TimeElapsed = secondsElapsed, - TimeUser = 0, - TimeSystem = 0 + TimeElapsed = timeSnapshot.WallClockTime.TotalSeconds, + TimeUser = timeSnapshot.UserProcessorTime.TotalSeconds, + TimeSystem = timeSnapshot.PrivilegedProcessorTime.TotalSeconds }; } diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs index e1b47744d5..ea29bd74e5 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs @@ -117,7 +117,7 @@ namespace Grpc.IntegrationTesting public class ServerRunnerImpl : IServerRunner { readonly Server server; - readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); + readonly TimeStats timeStats = new TimeStats(); public ServerRunnerImpl(Server server) { @@ -138,17 +138,16 @@ namespace Grpc.IntegrationTesting /// <returns>The stats.</returns> public ServerStats GetStats(bool reset) { - var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds; + var timeSnapshot = timeStats.GetSnapshot(reset); GrpcEnvironment.Logger.Info("[ServerRunner.GetStats] GC collection counts: gen0 {0}, gen1 {1}, gen2 {2}, (seconds since last reset {3})", - GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), secondsElapsed); + GC.CollectionCount(0), GC.CollectionCount(1), GC.CollectionCount(2), timeSnapshot.WallClockTime.TotalSeconds); - // TODO: populate user time and system time return new ServerStats { - TimeElapsed = secondsElapsed, - TimeUser = 0, - TimeSystem = 0 + TimeElapsed = timeSnapshot.WallClockTime.TotalSeconds, + TimeUser = timeSnapshot.UserProcessorTime.TotalSeconds, + TimeSystem = timeSnapshot.PrivilegedProcessorTime.TotalSeconds }; } diff --git a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs index 11956e4ac8..0c62380768 100644 --- a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs @@ -243,7 +243,7 @@ namespace Grpc.IntegrationTesting const string GaugeName = "csharp_overall_qps"; readonly Histogram histogram; - readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); + readonly TimeStats timeStats = new TimeStats(); public MetricsServiceImpl(Histogram histogram) { @@ -280,9 +280,9 @@ namespace Grpc.IntegrationTesting long GetQpsAndReset() { var snapshot = histogram.GetSnapshot(true); - var elapsedSnapshot = wallClockStopwatch.GetElapsedSnapshot(true); + var timeSnapshot = timeStats.GetSnapshot(true); - return (long) (snapshot.Count / elapsedSnapshot.TotalSeconds); + return (long) (snapshot.Count / timeSnapshot.WallClockTime.TotalSeconds); } } } diff --git a/src/csharp/Grpc.IntegrationTesting/TimeStats.cs b/src/csharp/Grpc.IntegrationTesting/TimeStats.cs new file mode 100644 index 0000000000..6aba04c194 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/TimeStats.cs @@ -0,0 +1,90 @@ +#region Copyright notice and license + +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Core.Utils; +using NUnit.Framework; +using Grpc.Testing; + +namespace Grpc.IntegrationTesting +{ + /// <summary> + /// Snapshottable time statistics. + /// </summary> + public class TimeStats + { + readonly object myLock = new object(); + DateTime lastWallClock; + TimeSpan lastUserTime; + TimeSpan lastPrivilegedTime; + + public TimeStats() + { + lastWallClock = DateTime.UtcNow; + lastUserTime = Process.GetCurrentProcess().UserProcessorTime; + lastPrivilegedTime = Process.GetCurrentProcess().PrivilegedProcessorTime; + } + + public Snapshot GetSnapshot(bool reset) + { + lock (myLock) + { + var wallClock = DateTime.UtcNow; + var userTime = Process.GetCurrentProcess().UserProcessorTime; + var privilegedTime = Process.GetCurrentProcess().PrivilegedProcessorTime; + var snapshot = new Snapshot(wallClock - lastWallClock, userTime - lastUserTime, privilegedTime - lastPrivilegedTime); + + if (reset) + { + lastWallClock = wallClock; + lastUserTime = userTime; + lastPrivilegedTime = privilegedTime; + } + return snapshot; + } + } + + public class Snapshot + { + public TimeSpan WallClockTime { get; } + public TimeSpan UserProcessorTime { get; } + public TimeSpan PrivilegedProcessorTime { get; } + + public Snapshot(TimeSpan wallClockTime, TimeSpan userProcessorTime, TimeSpan privilegedProcessorTime) + { + this.WallClockTime = wallClockTime; + this.UserProcessorTime = userProcessorTime; + this.PrivilegedProcessorTime = privilegedProcessorTime; + } + + public override string ToString() + { + return string.Format("[TimeStats.Snapshot: wallClock {0}, userProcessor {1}, privilegedProcessor {2}]", WallClockTime, UserProcessorTime, PrivilegedProcessorTime); + } + } + } +} diff --git a/src/csharp/Grpc.IntegrationTesting/WallClockStopwatch.cs b/src/csharp/Grpc.IntegrationTesting/WallClockStopwatch.cs deleted file mode 100644 index 38b58f296c..0000000000 --- a/src/csharp/Grpc.IntegrationTesting/WallClockStopwatch.cs +++ /dev/null @@ -1,63 +0,0 @@ -#region Copyright notice and license - -// Copyright 2015 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#endregion - -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Linq; -using System.Text.RegularExpressions; -using System.Threading; -using System.Threading.Tasks; -using Google.Protobuf; -using Grpc.Core; -using Grpc.Core.Utils; -using NUnit.Framework; -using Grpc.Testing; - -namespace Grpc.IntegrationTesting -{ - /// <summary> - /// Snapshottable wall clock stopwatch. - /// </summary> - public class WallClockStopwatch - { - long startTicks; - - public WallClockStopwatch() - { - this.startTicks = DateTime.UtcNow.Ticks; - } - - public TimeSpan GetElapsedSnapshot(bool reset) - { - var utcNow = DateTime.UtcNow; - - long oldStartTicks; - if (reset) - { - oldStartTicks = Interlocked.Exchange(ref this.startTicks, utcNow.Ticks); - } - else - { - oldStartTicks = this.startTicks; - } - return utcNow - new DateTime(oldStartTicks, DateTimeKind.Utc); - } - } -} diff --git a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs index 2d1c33e9a0..eefdb50e39 100644 --- a/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/CompletionRegistryBenchmark.cs @@ -43,7 +43,7 @@ namespace Grpc.Microbenchmarks public void Run(int threadCount, int iterations, bool useSharedRegistry) { Console.WriteLine(string.Format("CompletionRegistryBenchmark: threads={0}, iterations={1}, useSharedRegistry={2}", threadCount, iterations, useSharedRegistry)); - CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment) : null; + CompletionRegistry sharedRegistry = useSharedRegistry ? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()) : null; var threadedBenchmark = new ThreadedBenchmark(threadCount, () => ThreadBody(iterations, sharedRegistry)); threadedBenchmark.Run(); // TODO: parametrize by number of pending completions @@ -51,7 +51,7 @@ namespace Grpc.Microbenchmarks private void ThreadBody(int iterations, CompletionRegistry optionalSharedRegistry) { - var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment); + var completionRegistry = optionalSharedRegistry ?? new CompletionRegistry(environment, () => BatchContextSafeHandle.Create()); var ctx = BatchContextSafeHandle.Create(); var stopwatch = Stopwatch.StartNew(); @@ -64,7 +64,7 @@ namespace Grpc.Microbenchmarks stopwatch.Stop(); Console.WriteLine("Elapsed millis: " + stopwatch.ElapsedMilliseconds); - ctx.Dispose(); + ctx.Recycle(); } private class NopCompletionCallback : IOpCompletionCallback diff --git a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs index 9cff97eb88..da4f35ff96 100644 --- a/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs +++ b/src/csharp/Grpc.Microbenchmarks/SendMessageBenchmark.cs @@ -52,10 +52,7 @@ namespace Grpc.Microbenchmarks private void ThreadBody(int iterations, int payloadSize) { - // TODO(jtattermusch): parametrize by number of pending completions. - // TODO(jtattermusch): parametrize by cached/non-cached BatchContextSafeHandle - - var completionRegistry = new CompletionRegistry(environment); + var completionRegistry = new CompletionRegistry(environment, () => environment.BatchContextPool.Lease()); var cq = CompletionQueueSafeHandle.CreateAsync(completionRegistry); var call = CreateFakeCall(cq); diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat index ff013d5680..8f89e2846a 100755 --- a/src/csharp/build_packages_dotnetcli.bat +++ b/src/csharp/build_packages_dotnetcli.bat @@ -13,7 +13,7 @@ @rem limitations under the License. @rem Current package versions -set VERSION=1.8.0-dev +set VERSION=1.9.0-dev @rem Adjust the location of nuget.exe set NUGET=C:\nuget\nuget.exe diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh index 44a4791146..6a6cafe2bd 100755 --- a/src/csharp/build_packages_dotnetcli.sh +++ b/src/csharp/build_packages_dotnetcli.sh @@ -39,7 +39,7 @@ dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts -nuget pack Grpc.nuspec -Version "1.8.0-dev" -OutputDirectory ../../artifacts -nuget pack Grpc.Tools.nuspec -Version "1.8.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.Tools.nuspec -Version "1.9.0-dev" -OutputDirectory ../../artifacts (cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg) diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index bcb3bfaee5..24d779e1e5 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -197,10 +197,7 @@ void grpcsharp_metadata_array_move(grpc_metadata_array* dest, } GPR_EXPORT void GPR_CALLTYPE -grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) { - if (!ctx) { - return; - } +grpcsharp_batch_context_reset(grpcsharp_batch_context* ctx) { grpcsharp_metadata_array_destroy_metadata_including_entries( &(ctx->send_initial_metadata)); @@ -216,7 +213,15 @@ grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) { grpcsharp_metadata_array_destroy_metadata_only( &(ctx->recv_status_on_client.trailing_metadata)); grpc_slice_unref(ctx->recv_status_on_client.status_details); + memset(ctx, 0, sizeof(grpcsharp_batch_context)); +} +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_batch_context_destroy(grpcsharp_batch_context* ctx) { + if (!ctx) { + return; + } + grpcsharp_batch_context_reset(ctx); gpr_free(ctx); } diff --git a/src/csharp/tests.json b/src/csharp/tests.json index 7841051052..82573edecb 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -5,11 +5,13 @@ "Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest", "Grpc.Core.Internal.Tests.CompletionQueueEventTest", "Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest", + "Grpc.Core.Internal.Tests.DefaultObjectPoolTest", "Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest", "Grpc.Core.Internal.Tests.TimespecTest", "Grpc.Core.Tests.AppDomainUnloadTest", "Grpc.Core.Tests.AuthContextTest", "Grpc.Core.Tests.AuthPropertyTest", + "Grpc.Core.Tests.CallCancellationTest", "Grpc.Core.Tests.CallCredentialsTest", "Grpc.Core.Tests.CallOptionsTest", "Grpc.Core.Tests.ChannelCredentialsTest", |