diff options
Diffstat (limited to 'src/csharp')
62 files changed, 956 insertions, 688 deletions
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index d92addbf54..dcdddc769e 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -235,8 +235,16 @@ namespace Grpc.Core.Tests await barrier.Task; // make sure the handler has started. cts.Cancel(); - var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync); - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } } [Test] @@ -265,9 +273,15 @@ namespace Grpc.Core.Tests await handlerStartedBarrier.Task; cts.Cancel(); - var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync); - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); - + try + { + await call.ResponseAsync; + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } Assert.AreEqual("SUCCESS", await successTcs.Task); } diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs index cec8c7ce6b..6a156293ad 100644 --- a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs +++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs @@ -105,7 +105,15 @@ namespace Grpc.Core.Tests var parentCall = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token))); await readyToCancelTcs.Task; cts.Cancel(); - Assert.ThrowsAsync(typeof(RpcException), async () => await parentCall); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await parentCall; + Assert.Fail(); + } + catch (RpcException) + { + } Assert.AreEqual("CHILD_CALL_CANCELLED", await successTcs.Task); } diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index ab12c120cb..6fe382751a 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -32,7 +32,7 @@ #endregion using System; -using System.Threading; +using System.Linq; using Grpc.Core; using NUnit.Framework; @@ -44,7 +44,11 @@ namespace Grpc.Core.Tests public void InitializeAndShutdownGrpcEnvironment() { var env = GrpcEnvironment.AddRef(); - Assert.IsNotNull(env.CompletionQueue); + Assert.IsTrue(env.CompletionQueues.Count > 0); + for (int i = 0; i < env.CompletionQueues.Count; i++) + { + Assert.IsNotNull(env.CompletionQueues.ElementAt(i)); + } GrpcEnvironment.Release(); } diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs index 0e204761f6..c35aaf680f 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs @@ -53,8 +53,6 @@ namespace Grpc.Core.Internal.Tests [SetUp] public void Init() { - var environment = GrpcEnvironment.AddRef(); - // Create a fake server just so we have an instance to refer to. // The server won't actually be used at all. server = new Server() @@ -66,7 +64,6 @@ namespace Grpc.Core.Internal.Tests fakeCall = new FakeNativeCall(); asyncCallServer = new AsyncCallServer<string, string>( Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer, - environment, server); asyncCallServer.InitializeForTesting(fakeCall); } @@ -75,7 +72,6 @@ namespace Grpc.Core.Internal.Tests public void Cleanup() { server.ShutdownAsync().Wait(); - GrpcEnvironment.Release(); } [Test] @@ -136,7 +132,6 @@ namespace Grpc.Core.Internal.Tests public void WriteAfterCancelNotificationFails() { var finishedTask = asyncCallServer.ServerSideCallAsync(); - var requestStream = new ServerRequestStream<string, string>(asyncCallServer); var responseStream = new ServerResponseStream<string, string>(asyncCallServer); fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); @@ -181,6 +176,21 @@ namespace Grpc.Core.Internal.Tests AssertFinished(asyncCallServer, fakeCall, finishedTask); } + [Test] + public void WriteAfterWriteStatusThrowsInvalidOperationException() + { + var finishedTask = asyncCallServer.ServerSideCallAsync(); + var responseStream = new ServerResponseStream<string, string>(asyncCallServer); + + asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null); + Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await responseStream.WriteAsync("request1")); + + fakeCall.SendStatusFromServerHandler(true); + fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true); + + AssertFinished(asyncCallServer, fakeCall, finishedTask); + } + static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask) { Assert.IsTrue(fakeCall.IsDisposed); diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs index 777a1c8c50..81897f8c77 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -33,7 +33,6 @@ using System; using System.Collections.Generic; -using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Internal; @@ -82,7 +81,7 @@ namespace Grpc.Core.Internal.Tests Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await asyncCall.ReadMessageAsync()); Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); + () => asyncCall.SendMessageAsync("abc", new WriteFlags())); } [Test] @@ -103,7 +102,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.UnaryCallAsync("request1"); fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.InvalidArgument), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); @@ -148,7 +147,7 @@ namespace Grpc.Core.Internal.Tests var resultTask = asyncCall.ClientStreamingCallAsync(); fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.InvalidArgument), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument); @@ -193,7 +192,7 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.Internal), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal); @@ -211,7 +210,9 @@ namespace Grpc.Core.Internal.Tests new Metadata()); AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask); - var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1")); + + var writeTask = requestStream.WriteAsync("request1"); + var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(Status.DefaultSuccess, ex.Status); } @@ -223,11 +224,13 @@ namespace Grpc.Core.Internal.Tests fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange); - var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1")); + + var writeTask = requestStream.WriteAsync("request1"); + var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode); } @@ -267,7 +270,7 @@ namespace Grpc.Core.Internal.Tests } [Test] - public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException() + public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() { var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<string, string>(asyncCall); @@ -275,11 +278,12 @@ namespace Grpc.Core.Internal.Tests asyncCall.Cancel(); Assert.IsTrue(fakeCall.IsCancelled); - Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + var writeTask = requestStream.WriteAsync("request1"); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); fakeCall.UnaryResponseClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled), - CreateResponsePayload(), + null, new Metadata()); AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled); @@ -290,7 +294,7 @@ namespace Grpc.Core.Internal.Tests { asyncCall.StartServerStreamingCall("request1"); Assert.Throws(typeof(InvalidOperationException), - () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {})); + () => asyncCall.SendMessageAsync("abc", new WriteFlags())); } [Test] @@ -390,12 +394,13 @@ namespace Grpc.Core.Internal.Tests AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask); - var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1")); + var writeTask = requestStream.WriteAsync("request1"); + var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask); Assert.AreEqual(Status.DefaultSuccess, ex.Status); } [Test] - public void DuplexStreaming_CompleteAfterReceivingStatusFails() + public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds() { asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream<string, string>(asyncCall); @@ -411,7 +416,7 @@ namespace Grpc.Core.Internal.Tests } [Test] - public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException() + public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException() { asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream<string, string>(asyncCall); @@ -419,7 +424,9 @@ namespace Grpc.Core.Internal.Tests asyncCall.Cancel(); Assert.IsTrue(fakeCall.IsCancelled); - Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1")); + + var writeTask = requestStream.WriteAsync("request1"); + Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask); var readTask = responseStream.MoveNext(); fakeCall.ReceivedMessageHandler(true, null); diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs index c6843f10af..195119f920 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs @@ -60,7 +60,7 @@ namespace Grpc.Core.Internal.Tests var ev = cq.Next(); cq.Dispose(); GrpcEnvironment.Release(); - Assert.AreEqual(GRPCCompletionType.Shutdown, ev.type); + Assert.AreEqual(CompletionQueueEvent.CompletionType.Shutdown, ev.type); Assert.AreNotEqual(IntPtr.Zero, ev.success); Assert.AreEqual(IntPtr.Zero, ev.tag); } diff --git a/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs b/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs index 74f7f2497a..c124ea29af 100644 --- a/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs +++ b/src/csharp/Grpc.Core.Tests/Internal/TimespecTest.cs @@ -61,15 +61,15 @@ namespace Grpc.Core.Internal.Tests } [Test] - public void InfFuture() + public void InfFutureMatchesNativeValue() { - var timespec = Timespec.InfFuture; + Assert.AreEqual(Timespec.NativeInfFuture, Timespec.InfFuture); } [Test] - public void InfPast() + public void InfPastMatchesNativeValue() { - var timespec = Timespec.InfPast; + Assert.AreEqual(Timespec.NativeInfPast, Timespec.InfPast); } [Test] @@ -108,7 +108,7 @@ namespace Grpc.Core.Internal.Tests Assert.Throws(typeof(InvalidOperationException), () => new Timespec(0, 1000 * 1000 * 1000).ToDateTime()); Assert.Throws(typeof(InvalidOperationException), - () => new Timespec(0, 0, GPRClockType.Monotonic).ToDateTime()); + () => new Timespec(0, 0, ClockType.Monotonic).ToDateTime()); } [Test] diff --git a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs index 0663e77d1e..d770f82390 100644 --- a/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs +++ b/src/csharp/Grpc.Core.Tests/MarshallingErrorsTest.cs @@ -134,7 +134,15 @@ namespace Grpc.Core.Tests { helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => { - Assert.ThrowsAsync<IOException>(async () => await requestStream.MoveNext()); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await requestStream.MoveNext(); + Assert.Fail(); + } + catch (IOException) + { + } return "RESPONSE"; }); diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 93a6e6a3d9..886adfec33 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -31,7 +31,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -56,6 +55,7 @@ namespace Grpc.Core readonly string target; readonly GrpcEnvironment environment; + readonly CompletionQueueSafeHandle completionQueue; readonly ChannelSafeHandle handle; readonly Dictionary<string, ChannelOption> options; @@ -75,6 +75,7 @@ namespace Grpc.Core EnsureUserAgentChannelOption(this.options); this.environment = GrpcEnvironment.AddRef(); + this.completionQueue = this.environment.PickCompletionQueue(); using (var nativeCredentials = credentials.ToNativeCredentials()) using (var nativeChannelArgs = ChannelOptions.CreateChannelArgs(this.options.Values)) { @@ -135,7 +136,7 @@ namespace Grpc.Core tcs.SetCanceled(); } }); - handle.WatchConnectivityState(lastObservedState, deadlineTimespec, environment.CompletionQueue, environment.CompletionRegistry, handler); + handle.WatchConnectivityState(lastObservedState, deadlineTimespec, completionQueue, handler); return tcs.Task; } @@ -231,6 +232,14 @@ namespace Grpc.Core } } + internal CompletionQueueSafeHandle CompletionQueue + { + get + { + return this.completionQueue; + } + } + internal void AddCallReference(object call) { activeCallCounter.Increment(); diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 95077a6ca5..a8b7b5f00d 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -74,7 +74,6 @@ <Compile Include="Internal\CallSafeHandle.cs" /> <Compile Include="Internal\ChannelSafeHandle.cs" /> <Compile Include="Internal\CompletionQueueSafeHandle.cs" /> - <Compile Include="Internal\Enums.cs" /> <Compile Include="Internal\SafeHandleZeroIsInvalid.cs" /> <Compile Include="Internal\Timespec.cs" /> <Compile Include="Internal\GrpcThreadPool.cs" /> @@ -87,7 +86,6 @@ <Compile Include="Utils\BenchmarkUtil.cs" /> <Compile Include="ChannelCredentials.cs" /> <Compile Include="Internal\ChannelArgsSafeHandle.cs" /> - <Compile Include="Internal\AsyncCompletion.cs" /> <Compile Include="Internal\AsyncCallBase.cs" /> <Compile Include="Internal\AsyncCallServer.cs" /> <Compile Include="Internal\AsyncCall.cs" /> @@ -134,6 +132,10 @@ <Compile Include="DefaultCallInvoker.cs" /> <Compile Include="Internal\UnimplementedCallInvoker.cs" /> <Compile Include="Internal\InterceptingCallInvoker.cs" /> + <Compile Include="Internal\ServerRpcNew.cs" /> + <Compile Include="Internal\ClientSideStatus.cs" /> + <Compile Include="Internal\ClockType.cs" /> + <Compile Include="Internal\CallError.cs" /> </ItemGroup> <ItemGroup> <None Include="Grpc.Core.nuspec" /> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index bee0ef1d62..18af1099f1 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -32,8 +32,9 @@ #endregion using System; +using System.Collections.Generic; +using System.Linq; using System.Runtime.InteropServices; -using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Utils; @@ -51,12 +52,13 @@ namespace Grpc.Core static GrpcEnvironment instance; static int refCount; static int? customThreadPoolSize; + static int? customCompletionQueueCount; static ILogger logger = new ConsoleLogger(); readonly GrpcThreadPool threadPool; - readonly CompletionRegistry completionRegistry; readonly DebugStats debugStats = new DebugStats(); + readonly AtomicCounter cqPickerCounter = new AtomicCounter(); bool isClosed; /// <summary> @@ -141,36 +143,51 @@ namespace Grpc.Core } /// <summary> + /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events. + /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards. + /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing. + /// Most users should rely on the default value provided by gRPC library. + /// Note: this method is part of an experimental API that can change or be removed without any prior notice. + /// </summary> + public static void SetCompletionQueueCount(int completionQueueCount) + { + lock (staticLock) + { + GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized"); + GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number"); + customCompletionQueueCount = completionQueueCount; + } + } + + /// <summary> /// Creates gRPC environment. /// </summary> private GrpcEnvironment() { GrpcNativeInit(); - completionRegistry = new CompletionRegistry(this); - threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault()); + threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault()); threadPool.Start(); } /// <summary> - /// Gets the completion registry used by this gRPC environment. + /// Gets the completion queues used by this gRPC environment. /// </summary> - internal CompletionRegistry CompletionRegistry + internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues { get { - return this.completionRegistry; + return this.threadPool.CompletionQueues; } } /// <summary> - /// Gets the completion queue used by this gRPC environment. + /// Picks a completion queue in a round-robin fashion. + /// Shouldn't be invoked on a per-call basis (used at per-channel basis). /// </summary> - internal CompletionQueueSafeHandle CompletionQueue + internal CompletionQueueSafeHandle PickCompletionQueue() { - get - { - return this.threadPool.CompletionQueue; - } + var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count); + return this.threadPool.CompletionQueues.ElementAt(cqIndex); } /// <summary> @@ -230,5 +247,15 @@ namespace Grpc.Core // more work, but seems to work reasonably well for a start. return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2); } + + private int GetCompletionQueueCountOrDefault() + { + if (customCompletionQueueCount.HasValue) + { + return customCompletionQueueCount.Value; + } + // by default, create a completion queue for each thread + return GetThreadPoolSizeOrDefault(); + } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 55351869b5..895be690a5 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -32,12 +32,7 @@ #endregion using System; -using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; -using System.Threading; using System.Threading.Tasks; -using Grpc.Core.Internal; using Grpc.Core.Logging; using Grpc.Core.Profiling; using Grpc.Core.Utils; @@ -57,9 +52,11 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; + // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls. // Indicates that response streaming call has finished. TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>(); + // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers). // Response headers set here once received. TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>(); @@ -67,7 +64,7 @@ namespace Grpc.Core.Internal ClientSideStatus? finishedStatus; public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) - : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment) + : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) { this.details = callDetails.WithOptions(callDetails.Options.Normalize()); this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. @@ -144,7 +141,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); halfcloseRequested = true; readingDone = true; @@ -171,7 +168,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); readingDone = true; @@ -195,7 +192,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); halfcloseRequested = true; @@ -220,7 +217,7 @@ namespace Grpc.Core.Internal GrpcPreconditions.CheckState(!started); started = true; - Initialize(environment.CompletionQueue); + Initialize(details.Channel.CompletionQueue); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { @@ -232,11 +229,10 @@ namespace Grpc.Core.Internal /// <summary> /// Sends a streaming request. Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) + public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags) { - StartSendMessageInternal(msg, writeFlags, completionDelegate); + return SendMessageInternalAsync(msg, writeFlags); } /// <summary> @@ -250,29 +246,32 @@ namespace Grpc.Core.Internal /// <summary> /// Sends halfclose, indicating client is done with streaming requests. /// Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate) + public Task SendCloseFromClientAsync() { lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: true); + GrpcPreconditions.CheckState(started); - if (!disposed && !finished) + var earlyResult = CheckSendPreconditionsClientSide(); + if (earlyResult != null) { - call.StartSendCloseFromClient(HandleSendCloseFromClientFinished); + return earlyResult; } - else + + if (disposed || finished) { // In case the call has already been finished by the serverside, - // the halfclose has already been done implicitly, so we only - // emit the notification for the completion delegate. - Task.Run(() => HandleSendCloseFromClientFinished(true)); + // the halfclose has already been done implicitly, so just return + // completed task here. + halfcloseRequested = true; + return Task.FromResult<object>(null); } + call.StartSendCloseFromClient(HandleSendCloseFromClientFinished); halfcloseRequested = true; - sendCompletionDelegate = completionDelegate; + streamingWriteTcs = new TaskCompletionSource<object>(); + return streamingWriteTcs.Task; } } @@ -342,6 +341,45 @@ namespace Grpc.Core.Internal get { return true; } } + protected override Task CheckSendAllowedOrEarlyResult() + { + var earlyResult = CheckSendPreconditionsClientSide(); + if (earlyResult != null) + { + return earlyResult; + } + + if (finishedStatus.HasValue) + { + // throwing RpcException if we already received status on client + // side makes the most sense. + // Note that this throws even for StatusCode.OK. + // Writing after the call has finished is not a programming error because server can close + // the call anytime, so don't throw directly, but let the write task finish with an error. + var tcs = new TaskCompletionSource<object>(); + tcs.SetException(new RpcException(finishedStatus.Value.Status)); + return tcs.Task; + } + + return null; + } + + private Task CheckSendPreconditionsClientSide() + { + GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time."); + + if (cancelRequested) + { + // Return a cancelled task. + var tcs = new TaskCompletionSource<object>(); + tcs.SetCanceled(); + return tcs.Task; + } + + return null; + } + private void Initialize(CompletionQueueSafeHandle cq) { using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize")) @@ -368,7 +406,7 @@ namespace Grpc.Core.Internal var credentials = details.Options.Credentials; using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null) { - var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry, + var result = details.Channel.Handle.CreateCall( parentCall, ContextPropagationToken.DefaultMask, cq, details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials); return result; @@ -400,6 +438,7 @@ namespace Grpc.Core.Internal /// </summary> private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders) { + // TODO(jtattermusch): handle success==false responseHeadersTcs.SetResult(responseHeaders); } @@ -443,19 +482,6 @@ namespace Grpc.Core.Internal } } - protected override void CheckSendingAllowed(bool allowFinished) - { - base.CheckSendingAllowed(true); - - // throwing RpcException if we already received status on client - // side makes the most sense. - // Note that this throws even for StatusCode.OK. - if (!allowFinished && finishedStatus.HasValue) - { - throw new RpcException(finishedStatus.Value.Status); - } - } - /// <summary> /// Handles receive status completion for calls with streaming response. /// </summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 4de23706b2..cb8366c216 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -58,7 +58,6 @@ namespace Grpc.Core.Internal readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; - protected readonly GrpcEnvironment environment; protected readonly object myLock = new object(); protected INativeCall call; @@ -67,8 +66,8 @@ namespace Grpc.Core.Internal protected bool started; protected bool cancelRequested; - protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null. + protected TaskCompletionSource<object> streamingWriteTcs; // Completion of a pending streaming write or send close from client if not null. protected TaskCompletionSource<object> sendStatusFromServerTcs; protected bool readingDone; // True if last read (i.e. read with null payload) was already received. @@ -78,11 +77,10 @@ namespace Grpc.Core.Internal protected bool initialMetadataSent; protected long streamingWritesCounter; // Number of streaming send operations started so far. - public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment) + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) { this.serializer = GrpcPreconditions.CheckNotNull(serializer); this.deserializer = GrpcPreconditions.CheckNotNull(deserializer); - this.environment = GrpcPreconditions.CheckNotNull(environment); } /// <summary> @@ -128,28 +126,31 @@ namespace Grpc.Core.Internal /// <summary> /// Initiates sending a message. Only one send operation can be active at a time. - /// completionDelegate is invoked upon completion. /// </summary> - protected void StartSendMessageInternal(TWrite msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) + protected Task SendMessageInternalAsync(TWrite msg, WriteFlags writeFlags) { byte[] payload = UnsafeSerialize(msg); lock (myLock) { - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); - CheckSendingAllowed(allowFinished: false); + GrpcPreconditions.CheckState(started); + var earlyResult = CheckSendAllowedOrEarlyResult(); + if (earlyResult != null) + { + return earlyResult; + } call.StartSendMessage(HandleSendFinished, payload, writeFlags, !initialMetadataSent); - sendCompletionDelegate = completionDelegate; initialMetadataSent = true; streamingWritesCounter++; + streamingWriteTcs = new TaskCompletionSource<object>(); + return streamingWriteTcs.Task; } } /// <summary> /// Initiates reading a message. Only one read operation can be active at a time. - /// completionDelegate is invoked upon completion. /// </summary> protected Task<TRead> ReadMessageInternalAsync() { @@ -159,7 +160,7 @@ namespace Grpc.Core.Internal if (readingDone) { // the last read that returns null or throws an exception is idempotent - // and maintain its state. + // and maintains its state. GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads."); return streamingReadTcs.Task; } @@ -183,7 +184,7 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); + bool noMoreSendCompletions = streamingWriteTcs == null && (halfcloseRequested || cancelRequested || finished); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); @@ -213,24 +214,11 @@ namespace Grpc.Core.Internal { } - protected virtual void CheckSendingAllowed(bool allowFinished) - { - GrpcPreconditions.CheckState(started); - CheckNotCancelled(); - GrpcPreconditions.CheckState(!disposed || allowFinished); - - GrpcPreconditions.CheckState(!halfcloseRequested, "Already halfclosed."); - GrpcPreconditions.CheckState(!finished || allowFinished, "Already finished."); - GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); - } - - protected void CheckNotCancelled() - { - if (cancelRequested) - { - throw new OperationCanceledException("Remote call has been cancelled."); - } - } + /// <summary> + /// Checks if sending is allowed and possibly returns a Task that allows short-circuiting the send + /// logic by directly returning the write operation result task. Normally, null is returned. + /// </summary> + protected abstract Task CheckSendAllowedOrEarlyResult(); protected byte[] UnsafeSerialize(TWrite msg) { @@ -259,39 +247,27 @@ namespace Grpc.Core.Internal } } - protected void FireCompletion<T>(AsyncCompletionDelegate<T> completionDelegate, T value, Exception error) - { - try - { - completionDelegate(value, error); - } - catch (Exception e) - { - Logger.Error(e, "Exception occured while invoking completion delegate."); - } - } - /// <summary> /// Handles send completion. /// </summary> protected void HandleSendFinished(bool success) { - AsyncCompletionDelegate<object> origCompletionDelegate = null; + TaskCompletionSource<object> origTcs = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; + origTcs = streamingWriteTcs; + streamingWriteTcs = null; ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Send failed")); + origTcs.SetException(new InvalidOperationException("Send failed")); } else { - FireCompletion(origCompletionDelegate, null, null); + origTcs.SetResult(null); } } @@ -300,22 +276,23 @@ namespace Grpc.Core.Internal /// </summary> protected void HandleSendCloseFromClientFinished(bool success) { - AsyncCompletionDelegate<object> origCompletionDelegate = null; + TaskCompletionSource<object> origTcs = null; lock (myLock) { - origCompletionDelegate = sendCompletionDelegate; - sendCompletionDelegate = null; + origTcs = streamingWriteTcs; + streamingWriteTcs = null; ReleaseResourcesIfPossible(); } if (!success) { - FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Sending close from client has failed.")); + // TODO(jtattermusch): this method is same as HandleSendFinished (only the error message differs). + origTcs.SetException(new InvalidOperationException("Sending close from client has failed.")); } else { - FireCompletion(origCompletionDelegate, null, null); + origTcs.SetResult(null); } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index b1566b44a7..56c23ba3ef 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -51,14 +51,14 @@ namespace Grpc.Core.Internal readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); readonly Server server; - public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment) + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, Server server) : base(serializer, deserializer) { this.server = GrpcPreconditions.CheckNotNull(server); } - public void Initialize(CallSafeHandle call) + public void Initialize(CallSafeHandle call, CompletionQueueSafeHandle completionQueue) { - call.Initialize(environment.CompletionRegistry, environment.CompletionQueue); + call.Initialize(completionQueue); server.AddCallReference(this); InitializeInternal(call); @@ -91,11 +91,10 @@ namespace Grpc.Core.Internal /// <summary> /// Sends a streaming response. Only one pending send action is allowed at any given time. - /// completionDelegate is called when the operation finishes. /// </summary> - public void StartSendMessage(TResponse msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate) + public Task SendMessageAsync(TResponse msg, WriteFlags writeFlags) { - StartSendMessageInternal(msg, writeFlags, completionDelegate); + return SendMessageInternalAsync(msg, writeFlags); } /// <summary> @@ -110,20 +109,22 @@ namespace Grpc.Core.Internal /// Initiates sending a initial metadata. /// Even though C-core allows sending metadata in parallel to sending messages, we will treat sending metadata as a send message operation /// to make things simpler. - /// completionDelegate is invoked upon completion. /// </summary> - public void StartSendInitialMetadata(Metadata headers, AsyncCompletionDelegate<object> completionDelegate) + public Task SendInitialMetadataAsync(Metadata headers) { lock (myLock) { GrpcPreconditions.CheckNotNull(headers, "metadata"); - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + GrpcPreconditions.CheckState(started); GrpcPreconditions.CheckState(!initialMetadataSent, "Response headers can only be sent once per call."); GrpcPreconditions.CheckState(streamingWritesCounter == 0, "Response headers can only be sent before the first write starts."); - CheckSendingAllowed(allowFinished: false); - GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + var earlyResult = CheckSendAllowedOrEarlyResult(); + if (earlyResult != null) + { + return earlyResult; + } using (var metadataArray = MetadataArraySafeHandle.Create(headers)) { @@ -131,7 +132,8 @@ namespace Grpc.Core.Internal } this.initialMetadataSent = true; - sendCompletionDelegate = completionDelegate; + streamingWriteTcs = new TaskCompletionSource<object>(); + return streamingWriteTcs.Task; } } @@ -196,6 +198,16 @@ namespace Grpc.Core.Internal server.RemoveCallReference(this); } + protected override Task CheckSendAllowedOrEarlyResult() + { + GrpcPreconditions.CheckState(!halfcloseRequested, "Response stream has already been completed."); + GrpcPreconditions.CheckState(!finished, "Already finished."); + GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time"); + GrpcPreconditions.CheckState(!disposed); + + return null; + } + /// <summary> /// Handles the server side close completion. /// </summary> diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 66d2a66f99..c28a6f64d3 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -120,107 +120,4 @@ namespace Grpc.Core.Internal return true; } } - - /// <summary> - /// Status + metadata received on client side when call finishes. - /// (when receive_status_on_client operation finishes). - /// </summary> - internal struct ClientSideStatus - { - readonly Status status; - readonly Metadata trailers; - - public ClientSideStatus(Status status, Metadata trailers) - { - this.status = status; - this.trailers = trailers; - } - - public Status Status - { - get - { - return this.status; - } - } - - public Metadata Trailers - { - get - { - return this.trailers; - } - } - } - - /// <summary> - /// Details of a newly received RPC. - /// </summary> - internal struct ServerRpcNew - { - readonly Server server; - readonly CallSafeHandle call; - readonly string method; - readonly string host; - readonly Timespec deadline; - readonly Metadata requestMetadata; - - public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) - { - this.server = server; - this.call = call; - this.method = method; - this.host = host; - this.deadline = deadline; - this.requestMetadata = requestMetadata; - } - - public Server Server - { - get - { - return this.server; - } - } - - public CallSafeHandle Call - { - get - { - return this.call; - } - } - - public string Method - { - get - { - return this.method; - } - } - - public string Host - { - get - { - return this.host; - } - } - - public Timespec Deadline - { - get - { - return this.deadline; - } - } - - public Metadata RequestMetadata - { - get - { - return this.requestMetadata; - } - } - } } diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/CallError.cs index 74f86d2a30..541575f5e6 100644 --- a/src/csharp/Grpc.Core/Internal/Enums.cs +++ b/src/csharp/Grpc.Core/Internal/CallError.cs @@ -40,7 +40,7 @@ namespace Grpc.Core.Internal /// <summary> /// grpc_call_error from grpc/grpc.h /// </summary> - internal enum GRPCCallError + internal enum CallError { /* everything went ok */ OK = 0, @@ -70,42 +70,9 @@ namespace Grpc.Core.Internal /// <summary> /// Checks the call API invocation's result is OK. /// </summary> - public static void CheckOk(this GRPCCallError callError) + public static void CheckOk(this CallError callError) { - GrpcPreconditions.CheckState(callError == GRPCCallError.OK, "Call error: " + callError); + GrpcPreconditions.CheckState(callError == CallError.OK, "Call error: " + callError); } } - - /// <summary> - /// grpc_completion_type from grpc/grpc.h - /// </summary> - internal enum GRPCCompletionType - { - /* Shutting down */ - Shutdown, - - /* No event before timeout */ - Timeout, - - /* operation completion */ - OpComplete - } - - /// <summary> - /// gpr_clock_type from grpc/support/time.h - /// </summary> - internal enum GPRClockType - { - /* Monotonic clock */ - Monotonic, - - /* Realtime clock */ - Realtime, - - /* Precise clock good for performance profiling. */ - Precise, - - /* Timespan - the distance between two time points */ - Timespan - } } diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 244b97d4a4..82361f5797 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -47,16 +47,14 @@ namespace Grpc.Core.Internal static readonly NativeMethods Native = NativeMethods.Get(); const uint GRPC_WRITE_BUFFER_HINT = 1; - CompletionRegistry completionRegistry; CompletionQueueSafeHandle completionQueue; private CallSafeHandle() { } - public void Initialize(CompletionRegistry completionRegistry, CompletionQueueSafeHandle completionQueue) + public void Initialize(CompletionQueueSafeHandle completionQueue) { - this.completionRegistry = completionRegistry; this.completionQueue = completionQueue; } @@ -70,7 +68,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); Native.grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } @@ -90,7 +88,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); Native.grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } } @@ -100,7 +98,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); Native.grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); } } @@ -110,7 +108,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); Native.grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } } @@ -120,7 +118,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); Native.grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } } @@ -130,7 +128,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); Native.grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } } @@ -142,7 +140,7 @@ namespace Grpc.Core.Internal { var ctx = BatchContextSafeHandle.Create(); var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero; - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata, optionalPayload, optionalPayloadLength, writeFlags).CheckOk(); } @@ -153,7 +151,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); Native.grpcsharp_call_recv_message(this, ctx).CheckOk(); } } @@ -163,7 +161,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); Native.grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); } } @@ -173,7 +171,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); Native.grpcsharp_call_start_serverside(this, ctx).CheckOk(); } } @@ -183,7 +181,7 @@ namespace Grpc.Core.Internal using (completionQueue.NewScope()) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); Native.grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } } diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 1dbd1f4e34..62864dff0c 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -63,7 +63,7 @@ namespace Grpc.Core.Internal return Native.grpcsharp_secure_channel_create(credentials, target, channelArgs); } - public CallSafeHandle CreateCall(CompletionRegistry registry, CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials) + public CallSafeHandle CreateCall(CallSafeHandle parentCall, ContextPropagationFlags propagationMask, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline, CallCredentialsSafeHandle credentials) { using (Profilers.ForCurrentThread().NewScope("ChannelSafeHandle.CreateCall")) { @@ -72,7 +72,7 @@ namespace Grpc.Core.Internal { result.SetCredentials(credentials); } - result.Initialize(registry, cq); + result.Initialize(cq); return result; } } @@ -82,11 +82,10 @@ namespace Grpc.Core.Internal return Native.grpcsharp_channel_check_connectivity_state(this, tryToConnect ? 1 : 0); } - public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, - CompletionRegistry completionRegistry, BatchCompletionDelegate callback) + public void WatchConnectivityState(ChannelState lastObservedState, Timespec deadline, CompletionQueueSafeHandle cq, BatchCompletionDelegate callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + cq.CompletionRegistry.RegisterBatchCompletion(ctx, callback); Native.grpcsharp_channel_watch_connectivity_state(this, lastObservedState, deadline, cq, ctx); } diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs index 013f00ff6f..924de028f5 100644 --- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs @@ -50,16 +50,12 @@ namespace Grpc.Core.Internal public Task WriteAsync(TRequest message) { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendMessageAsync(message, GetWriteFlags()); } public Task CompleteAsync() { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendCloseFromClient(taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendCloseFromClientAsync(); } public WriteOptions WriteOptions diff --git a/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs b/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs new file mode 100644 index 0000000000..5727e3f11f --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ClientSideStatus.cs @@ -0,0 +1,70 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Status + metadata received on client side when call finishes. + /// (when receive_status_on_client operation finishes). + /// </summary> + internal struct ClientSideStatus + { + readonly Status status; + readonly Metadata trailers; + + public ClientSideStatus(Status status, Metadata trailers) + { + this.status = status; + this.trailers = trailers; + } + + public Status Status + { + get + { + return this.status; + } + } + + public Metadata Trailers + { + get + { + return this.trailers; + } + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ClockType.cs b/src/csharp/Grpc.Core/Internal/ClockType.cs new file mode 100644 index 0000000000..57533c9d2f --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ClockType.cs @@ -0,0 +1,53 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +namespace Grpc.Core.Internal +{ + /// <summary> + /// gpr_clock_type from grpc/support/time.h + /// </summary> + internal enum ClockType + { + /* Monotonic clock */ + Monotonic, + + /* Realtime clock */ + Realtime, + + /* Precise clock good for performance profiling. */ + Precise, + + /* Timespan - the distance between two time points */ + Timespan + } +} diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs index 288680792a..a78e9b70f3 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueEvent.cs @@ -44,7 +44,7 @@ namespace Grpc.Core.Internal { static readonly NativeMethods Native = NativeMethods.Get(); - public GRPCCompletionType type; + public CompletionType type; public int success; public IntPtr tag; @@ -55,5 +55,20 @@ namespace Grpc.Core.Internal return Native.grpcsharp_sizeof_grpc_event(); } } + + /// <summary> + /// grpc_completion_type from grpc/grpc.h + /// </summary> + internal enum CompletionType + { + /* Shutting down */ + Shutdown, + + /* No event before timeout */ + Timeout, + + /* operation completion */ + OpComplete + } } } diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs index 91364cdc70..46f5624223 100644 --- a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs @@ -45,6 +45,7 @@ namespace Grpc.Core.Internal static readonly NativeMethods Native = NativeMethods.Get(); AtomicCounter shutdownRefcount = new AtomicCounter(1); + CompletionRegistry completionRegistry; private CompletionQueueSafeHandle() { @@ -53,7 +54,13 @@ namespace Grpc.Core.Internal public static CompletionQueueSafeHandle Create() { return Native.grpcsharp_completion_queue_create(); + } + public static CompletionQueueSafeHandle Create(CompletionRegistry completionRegistry) + { + var cq = Native.grpcsharp_completion_queue_create(); + cq.completionRegistry = completionRegistry; + return cq; } public CompletionQueueEvent Next() @@ -83,6 +90,15 @@ namespace Grpc.Core.Internal DecrementShutdownRefcount(); } + /// <summary> + /// Completion registry associated with this completion queue. + /// Doesn't need to be set if only using Pluck() operations. + /// </summary> + public CompletionRegistry CompletionRegistry + { + get { return completionRegistry; } + } + protected override bool ReleaseHandle() { Native.grpcsharp_completion_queue_destroy(handle); diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index 4b7124ee74..4de543bef7 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -33,15 +33,15 @@ using System; using System.Collections.Generic; -using System.Runtime.InteropServices; +using System.Linq; using System.Threading; -using System.Threading.Tasks; using Grpc.Core.Logging; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// <summary> - /// Pool of threads polling on the same completion queue. + /// Pool of threads polling on a set of completions queues. /// </summary> internal class GrpcThreadPool { @@ -51,25 +51,31 @@ namespace Grpc.Core.Internal readonly object myLock = new object(); readonly List<Thread> threads = new List<Thread>(); readonly int poolSize; + readonly int completionQueueCount; - CompletionQueueSafeHandle cq; + IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues; - public GrpcThreadPool(GrpcEnvironment environment, int poolSize) + /// <summary> + /// Creates a thread pool threads polling on a set of completions queues. + /// </summary> + /// <param name="environment">Environment.</param> + /// <param name="poolSize">Pool size.</param> + /// <param name="completionQueueCount">Completion queue count.</param> + public GrpcThreadPool(GrpcEnvironment environment, int poolSize, int completionQueueCount) { this.environment = environment; this.poolSize = poolSize; + this.completionQueueCount = completionQueueCount; + GrpcPreconditions.CheckArgument(poolSize >= completionQueueCount, + "Thread pool size cannot be smaller than the number of completion queues used."); } public void Start() { lock (myLock) { - if (cq != null) - { - throw new InvalidOperationException("Already started."); - } - - cq = CompletionQueueSafeHandle.Create(); + GrpcPreconditions.CheckState(completionQueues == null, "Already started."); + completionQueues = CreateCompletionQueueList(environment, completionQueueCount); for (int i = 0; i < poolSize; i++) { @@ -82,49 +88,60 @@ namespace Grpc.Core.Internal { lock (myLock) { - cq.Shutdown(); + foreach (var cq in completionQueues) + { + cq.Shutdown(); + } + foreach (var thread in threads) { thread.Join(); } - cq.Dispose(); + foreach (var cq in completionQueues) + { + cq.Dispose(); + } } } - internal CompletionQueueSafeHandle CompletionQueue + internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues { get { - return cq; + return completionQueues; } } - private Thread CreateAndStartThread(int i) + private Thread CreateAndStartThread(int threadIndex) { - var thread = new Thread(new ThreadStart(RunHandlerLoop)); + var cqIndex = threadIndex % completionQueues.Count; + var cq = completionQueues.ElementAt(cqIndex); + + var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq))); thread.IsBackground = false; + thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex); thread.Start(); - thread.Name = "grpc " + i; + return thread; } /// <summary> /// Body of the polling thread. /// </summary> - private void RunHandlerLoop() + private void RunHandlerLoop(CompletionQueueSafeHandle cq) { CompletionQueueEvent ev; do { ev = cq.Next(); - if (ev.type == GRPCCompletionType.OpComplete) + if (ev.type == CompletionQueueEvent.CompletionType.OpComplete) { bool success = (ev.success != 0); IntPtr tag = ev.tag; try { - var callback = environment.CompletionRegistry.Extract(tag); + var callback = cq.CompletionRegistry.Extract(tag); callback(success); } catch (Exception e) @@ -133,7 +150,18 @@ namespace Grpc.Core.Internal } } } - while (ev.type != GRPCCompletionType.Shutdown); + while (ev.type != CompletionQueueEvent.CompletionType.Shutdown); + } + + private static IReadOnlyCollection<CompletionQueueSafeHandle> CreateCompletionQueueList(GrpcEnvironment environment, int completionQueueCount) + { + var list = new List<CompletionQueueSafeHandle>(); + for (int i = 0; i < completionQueueCount; i++) + { + var completionRegistry = new CompletionRegistry(environment); + list.Add(CompletionQueueSafeHandle.Create(completionRegistry)); + } + return list.AsReadOnly(); } } } diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs index 25735d5262..dc9f62fdab 100644 --- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs @@ -50,6 +50,11 @@ namespace Grpc.Core.Internal { using (Profilers.ForCurrentThread().NewScope("MetadataArraySafeHandle.Create")) { + if (metadata.Count == 0) + { + return new MetadataArraySafeHandle(); + } + // TODO(jtattermusch): we might wanna check that the metadata is readonly var metadataArray = Native.grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count)); for (int i = 0; i < metadata.Count; i++) diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs index c277c73ef0..65607ed120 100644 --- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs +++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs @@ -137,6 +137,7 @@ namespace Grpc.Core.Internal public readonly Delegates.grpcsharp_server_credentials_release_delegate grpcsharp_server_credentials_release; public readonly Delegates.grpcsharp_server_create_delegate grpcsharp_server_create; + public readonly Delegates.grpcsharp_server_register_completion_queue_delegate grpcsharp_server_register_completion_queue; public readonly Delegates.grpcsharp_server_add_insecure_http2_port_delegate grpcsharp_server_add_insecure_http2_port; public readonly Delegates.grpcsharp_server_add_secure_http2_port_delegate grpcsharp_server_add_secure_http2_port; public readonly Delegates.grpcsharp_server_start_delegate grpcsharp_server_start; @@ -244,6 +245,7 @@ namespace Grpc.Core.Internal this.grpcsharp_server_credentials_release = GetMethodDelegate<Delegates.grpcsharp_server_credentials_release_delegate>(library); this.grpcsharp_server_create = GetMethodDelegate<Delegates.grpcsharp_server_create_delegate>(library); + this.grpcsharp_server_register_completion_queue = GetMethodDelegate<Delegates.grpcsharp_server_register_completion_queue_delegate>(library); this.grpcsharp_server_add_insecure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_insecure_http2_port_delegate>(library); this.grpcsharp_server_add_secure_http2_port = GetMethodDelegate<Delegates.grpcsharp_server_add_secure_http2_port_delegate>(library); this.grpcsharp_server_start = GetMethodDelegate<Delegates.grpcsharp_server_start_delegate>(library); @@ -348,6 +350,7 @@ namespace Grpc.Core.Internal this.grpcsharp_server_credentials_release = PInvokeMethods.grpcsharp_server_credentials_release; this.grpcsharp_server_create = PInvokeMethods.grpcsharp_server_create; + this.grpcsharp_server_register_completion_queue = PInvokeMethods.grpcsharp_server_register_completion_queue; this.grpcsharp_server_add_insecure_http2_port = PInvokeMethods.grpcsharp_server_add_insecure_http2_port; this.grpcsharp_server_add_secure_http2_port = PInvokeMethods.grpcsharp_server_add_secure_http2_port; this.grpcsharp_server_start = PInvokeMethods.grpcsharp_server_start; @@ -418,33 +421,33 @@ namespace Grpc.Core.Internal public delegate CallCredentialsSafeHandle grpcsharp_composite_call_credentials_create_delegate(CallCredentialsSafeHandle creds1, CallCredentialsSafeHandle creds2); public delegate void grpcsharp_call_credentials_release_delegate(IntPtr credentials); - public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call); - public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description); - public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_cancel_delegate(CallSafeHandle call); + public delegate CallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description); + public delegate CallError grpcsharp_call_start_unary_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); - public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); - public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); - public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); - public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_send_message_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); - public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); - public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); - public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_recv_message_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); - public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); - public delegate GRPCCallError grpcsharp_call_start_serverside_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_start_serverside_delegate(CallSafeHandle call, BatchContextSafeHandle ctx); - public delegate GRPCCallError grpcsharp_call_send_initial_metadata_delegate(CallSafeHandle call, + public delegate CallError grpcsharp_call_send_initial_metadata_delegate(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); - public delegate GRPCCallError grpcsharp_call_set_credentials_delegate(CallSafeHandle call, CallCredentialsSafeHandle credentials); + public delegate CallError grpcsharp_call_set_credentials_delegate(CallSafeHandle call, CallCredentialsSafeHandle credentials); public delegate CStringSafeHandle grpcsharp_call_get_peer_delegate(CallSafeHandle call); public delegate void grpcsharp_call_destroy_delegate(IntPtr call); @@ -493,23 +496,24 @@ namespace Grpc.Core.Internal public delegate ServerCredentialsSafeHandle grpcsharp_ssl_server_credentials_create_delegate(string pemRootCerts, string[] keyCertPairCertChainArray, string[] keyCertPairPrivateKeyArray, UIntPtr numKeyCertPairs, bool forceClientAuth); public delegate void grpcsharp_server_credentials_release_delegate(IntPtr credentials); - public delegate ServerSafeHandle grpcsharp_server_create_delegate(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args); + public delegate ServerSafeHandle grpcsharp_server_create_delegate(ChannelArgsSafeHandle args); + public delegate void grpcsharp_server_register_completion_queue_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq); public delegate int grpcsharp_server_add_insecure_http2_port_delegate(ServerSafeHandle server, string addr); public delegate int grpcsharp_server_add_secure_http2_port_delegate(ServerSafeHandle server, string addr, ServerCredentialsSafeHandle creds); public delegate void grpcsharp_server_start_delegate(ServerSafeHandle server); - public delegate GRPCCallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); + public delegate CallError grpcsharp_server_request_call_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); public delegate void grpcsharp_server_cancel_all_calls_delegate(ServerSafeHandle server); public delegate void grpcsharp_server_shutdown_and_notify_callback_delegate(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); public delegate void grpcsharp_server_destroy_delegate(IntPtr server); - public delegate Timespec gprsharp_now_delegate(GPRClockType clockType); - public delegate Timespec gprsharp_inf_future_delegate(GPRClockType clockType); - public delegate Timespec gprsharp_inf_past_delegate(GPRClockType clockType); + public delegate Timespec gprsharp_now_delegate(ClockType clockType); + public delegate Timespec gprsharp_inf_future_delegate(ClockType clockType); + public delegate Timespec gprsharp_inf_past_delegate(ClockType clockType); - public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, GPRClockType targetClock); + public delegate Timespec gprsharp_convert_clock_type_delegate(Timespec t, ClockType targetClock); public delegate int gprsharp_sizeof_timespec_delegate(); - public delegate GRPCCallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback); + public delegate CallError grpcsharp_test_callback_delegate([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback); public delegate IntPtr grpcsharp_test_nop_delegate(IntPtr ptr); } @@ -587,59 +591,59 @@ namespace Grpc.Core.Internal // CallSafeHandle [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); + public static extern CallError grpcsharp_call_cancel(CallSafeHandle call); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); + public static extern CallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, + public static extern CallError grpcsharp_call_start_unary(CallSafeHandle call, BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, + public static extern CallError grpcsharp_call_start_client_streaming(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, + public static extern CallError grpcsharp_call_start_server_streaming(CallSafeHandle call, BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, + public static extern CallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, + public static extern CallError grpcsharp_call_send_message(CallSafeHandle call, BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, + public static extern CallError grpcsharp_call_send_close_from_client(CallSafeHandle call, BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, + public static extern CallError grpcsharp_call_send_status_from_server(CallSafeHandle call, BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, + public static extern CallError grpcsharp_call_recv_message(CallSafeHandle call, BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call, + public static extern CallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call, BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, + public static extern CallError grpcsharp_call_start_serverside(CallSafeHandle call, BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_send_initial_metadata(CallSafeHandle call, + public static extern CallError grpcsharp_call_send_initial_metadata(CallSafeHandle call, BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_call_set_credentials(CallSafeHandle call, CallCredentialsSafeHandle credentials); + public static extern CallError grpcsharp_call_set_credentials(CallSafeHandle call, CallCredentialsSafeHandle credentials); [DllImport("grpc_csharp_ext.dll")] public static extern CStringSafeHandle grpcsharp_call_get_peer(CallSafeHandle call); @@ -773,7 +777,10 @@ namespace Grpc.Core.Internal // ServerSafeHandle [DllImport("grpc_csharp_ext.dll")] - public static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args); + public static extern ServerSafeHandle grpcsharp_server_create(ChannelArgsSafeHandle args); + + [DllImport("grpc_csharp_ext.dll")] + public static extern void grpcsharp_server_register_completion_queue(ServerSafeHandle server, CompletionQueueSafeHandle cq); [DllImport("grpc_csharp_ext.dll")] public static extern int grpcsharp_server_add_insecure_http2_port(ServerSafeHandle server, string addr); @@ -785,7 +792,7 @@ namespace Grpc.Core.Internal public static extern void grpcsharp_server_start(ServerSafeHandle server); [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); + public static extern CallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] public static extern void grpcsharp_server_cancel_all_calls(ServerSafeHandle server); @@ -799,16 +806,16 @@ namespace Grpc.Core.Internal // Timespec [DllImport("grpc_csharp_ext.dll")] - public static extern Timespec gprsharp_now(GPRClockType clockType); + public static extern Timespec gprsharp_now(ClockType clockType); [DllImport("grpc_csharp_ext.dll")] - public static extern Timespec gprsharp_inf_future(GPRClockType clockType); + public static extern Timespec gprsharp_inf_future(ClockType clockType); [DllImport("grpc_csharp_ext.dll")] - public static extern Timespec gprsharp_inf_past(GPRClockType clockType); + public static extern Timespec gprsharp_inf_past(ClockType clockType); [DllImport("grpc_csharp_ext.dll")] - public static extern Timespec gprsharp_convert_clock_type(Timespec t, GPRClockType targetClock); + public static extern Timespec gprsharp_convert_clock_type(Timespec t, ClockType targetClock); [DllImport("grpc_csharp_ext.dll")] public static extern int gprsharp_sizeof_timespec(); @@ -816,7 +823,7 @@ namespace Grpc.Core.Internal // Testing [DllImport("grpc_csharp_ext.dll")] - public static extern GRPCCallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback); + public static extern CallError grpcsharp_test_callback([MarshalAs(UnmanagedType.FunctionPtr)] OpCompletionDelegate callback); [DllImport("grpc_csharp_ext.dll")] public static extern IntPtr grpcsharp_test_nop(IntPtr ptr); diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 85b7a4b01e..6a2f520163 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -44,7 +44,7 @@ namespace Grpc.Core.Internal { internal interface IServerCallHandler { - Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment); + Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq); } internal class UnaryServerCallHandler<TRequest, TResponse> : IServerCallHandler @@ -62,14 +62,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment, newRpc.Server); + newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); @@ -121,14 +121,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment, newRpc.Server); + newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); @@ -179,14 +179,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment, newRpc.Server); + newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); @@ -237,14 +237,14 @@ namespace Grpc.Core.Internal this.handler = handler; } - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment, newRpc.Server); + newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); var requestStream = new ServerRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall); @@ -281,13 +281,13 @@ namespace Grpc.Core.Internal { public static readonly NoSuchMethodCallHandler Instance = new NoSuchMethodCallHandler(); - public async Task HandleCall(ServerRpcNew newRpc, GrpcEnvironment environment) + public async Task HandleCall(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { // We don't care about the payload type here. var asyncCall = new AsyncCallServer<byte[], byte[]>( - (payload) => payload, (payload) => payload, environment, newRpc.Server); + (payload) => payload, (payload) => payload, newRpc.Server); - asyncCall.Initialize(newRpc.Call); + asyncCall.Initialize(newRpc.Call, cq); var finishedTask = asyncCall.ServerSideCallAsync(); await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false); await finishedTask.ConfigureAwait(false); @@ -317,7 +317,7 @@ namespace Grpc.Core.Internal where TRequest : class where TResponse : class { - DateTime realtimeDeadline = newRpc.Deadline.ToClockType(GPRClockType.Realtime).ToDateTime(); + DateTime realtimeDeadline = newRpc.Deadline.ToClockType(ClockType.Realtime).ToDateTime(); return new ServerCallContext(newRpc.Call, newRpc.Method, newRpc.Host, peer, realtimeDeadline, newRpc.RequestMetadata, cancellationToken, serverResponseStream.WriteResponseHeadersAsync, serverResponseStream); diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs index ecfee0bfdd..25b79b4398 100644 --- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs @@ -52,16 +52,12 @@ namespace Grpc.Core.Internal public Task WriteAsync(TResponse message) { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendMessage(message, GetWriteFlags(), taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendMessageAsync(message, GetWriteFlags()); } public Task WriteResponseHeadersAsync(Metadata responseHeaders) { - var taskSource = new AsyncCompletionTaskSource<object>(); - call.StartSendInitialMetadata(responseHeaders, taskSource.CompletionDelegate); - return taskSource.Task; + return call.SendInitialMetadataAsync(responseHeaders); } public WriteOptions WriteOptions diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/ServerRpcNew.cs index 7e86fddb4d..e4f1880bdb 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs +++ b/src/csharp/Grpc.Core/Internal/ServerRpcNew.cs @@ -32,63 +32,78 @@ #endregion using System; -using System.Diagnostics; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; -using System.Threading; -using System.Threading.Tasks; -using Grpc.Core.Internal; -using Grpc.Core.Utils; +using Grpc.Core; namespace Grpc.Core.Internal { /// <summary> - /// If error != null, there's been an error or operation has been cancelled. + /// Details of a newly received RPC. /// </summary> - internal delegate void AsyncCompletionDelegate<T>(T result, Exception error); - - /// <summary> - /// Helper for transforming AsyncCompletionDelegate into full-fledged Task. - /// </summary> - internal class AsyncCompletionTaskSource<T> + internal struct ServerRpcNew { - readonly TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); - readonly AsyncCompletionDelegate<T> completionDelegate; + readonly Server server; + readonly CallSafeHandle call; + readonly string method; + readonly string host; + readonly Timespec deadline; + readonly Metadata requestMetadata; + + public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) + { + this.server = server; + this.call = call; + this.method = method; + this.host = host; + this.deadline = deadline; + this.requestMetadata = requestMetadata; + } + + public Server Server + { + get + { + return this.server; + } + } - public AsyncCompletionTaskSource() + public CallSafeHandle Call { - completionDelegate = new AsyncCompletionDelegate<T>(HandleCompletion); + get + { + return this.call; + } } - public Task<T> Task + public string Method { get { - return tcs.Task; + return this.method; } } - public AsyncCompletionDelegate<T> CompletionDelegate + public string Host { get { - return completionDelegate; + return this.host; } } - private void HandleCompletion(T value, Exception error) + public Timespec Deadline { - if (error == null) + get { - tcs.SetResult(value); - return; + return this.deadline; } - if (error is OperationCanceledException) + } + + public Metadata RequestMetadata + { + get { - tcs.SetCanceled(); - return; + return this.requestMetadata; } - tcs.SetException(error); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index 6b5f70e220..8581302706 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -31,12 +31,6 @@ #endregion -using System; -using System.Collections.Concurrent; -using System.Diagnostics; -using System.Runtime.InteropServices; -using Grpc.Core.Utils; - namespace Grpc.Core.Internal { /// <summary> @@ -50,12 +44,17 @@ namespace Grpc.Core.Internal { } - public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) + public static ServerSafeHandle NewServer(ChannelArgsSafeHandle args) { // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. // Doing so would make object finalizer crash if we end up abandoning the handle. GrpcEnvironment.GrpcNativeInit(); - return Native.grpcsharp_server_create(cq, args); + return Native.grpcsharp_server_create(args); + } + + public void RegisterCompletionQueue(CompletionQueueSafeHandle cq) + { + Native.grpcsharp_server_register_completion_queue(this, cq); } public int AddInsecurePort(string addr) @@ -73,18 +72,18 @@ namespace Grpc.Core.Internal Native.grpcsharp_server_start(this); } - public void ShutdownAndNotify(BatchCompletionDelegate callback, GrpcEnvironment environment) + public void ShutdownAndNotify(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) { var ctx = BatchContextSafeHandle.Create(); - environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); - Native.grpcsharp_server_shutdown_and_notify_callback(this, environment.CompletionQueue, ctx); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + Native.grpcsharp_server_shutdown_and_notify_callback(this, completionQueue, ctx); } - public void RequestCall(BatchCompletionDelegate callback, GrpcEnvironment environment) + public void RequestCall(BatchCompletionDelegate callback, CompletionQueueSafeHandle completionQueue) { var ctx = BatchContextSafeHandle.Create(); - environment.CompletionRegistry.RegisterBatchCompletion(ctx, callback); - Native.grpcsharp_server_request_call(this, environment.CompletionQueue, ctx).CheckOk(); + completionQueue.CompletionRegistry.RegisterBatchCompletion(ctx, callback); + Native.grpcsharp_server_request_call(this, completionQueue, ctx).CheckOk(); } protected override bool ReleaseHandle() diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs index 56172a5dda..c9fd710e1e 100644 --- a/src/csharp/Grpc.Core/Internal/Timespec.cs +++ b/src/csharp/Grpc.Core/Internal/Timespec.cs @@ -49,11 +49,11 @@ namespace Grpc.Core.Internal static readonly NativeMethods Native = NativeMethods.Get(); static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc); - public Timespec(long tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, GPRClockType.Realtime) + public Timespec(long tv_sec, int tv_nsec) : this(tv_sec, tv_nsec, ClockType.Realtime) { } - public Timespec(long tv_sec, int tv_nsec, GPRClockType clock_type) + public Timespec(long tv_sec, int tv_nsec, ClockType clock_type) { this.tv_sec = tv_sec; this.tv_nsec = tv_nsec; @@ -62,7 +62,7 @@ namespace Grpc.Core.Internal private long tv_sec; private int tv_nsec; - private GPRClockType clock_type; + private ClockType clock_type; /// <summary> /// Timespec a long time in the future. @@ -71,7 +71,7 @@ namespace Grpc.Core.Internal { get { - return Native.gprsharp_inf_future(GPRClockType.Realtime); + return new Timespec(long.MaxValue, 0, ClockType.Realtime); } } @@ -82,7 +82,7 @@ namespace Grpc.Core.Internal { get { - return Native.gprsharp_inf_past(GPRClockType.Realtime); + return new Timespec(long.MinValue, 0, ClockType.Realtime); } } @@ -93,7 +93,7 @@ namespace Grpc.Core.Internal { get { - return Native.gprsharp_now(GPRClockType.Realtime); + return Native.gprsharp_now(ClockType.Realtime); } } @@ -122,7 +122,7 @@ namespace Grpc.Core.Internal /// <summary> /// Converts the timespec to desired clock type. /// </summary> - public Timespec ToClockType(GPRClockType targetClock) + public Timespec ToClockType(ClockType targetClock) { return Native.gprsharp_convert_clock_type(this, targetClock); } @@ -142,7 +142,7 @@ namespace Grpc.Core.Internal public DateTime ToDateTime() { GrpcPreconditions.CheckState(tv_nsec >= 0 && tv_nsec < NanosPerSecond); - GrpcPreconditions.CheckState(clock_type == GPRClockType.Realtime); + GrpcPreconditions.CheckState(clock_type == ClockType.Realtime); // fast path for InfFuture if (this.Equals(InfFuture)) @@ -227,10 +227,11 @@ namespace Grpc.Core.Internal { get { - return Native.gprsharp_now(GPRClockType.Precise); + return Native.gprsharp_now(ClockType.Precise); } } + // for tests only internal static int NativeSize { get @@ -238,5 +239,23 @@ namespace Grpc.Core.Internal return Native.gprsharp_sizeof_timespec(); } } + + // for tests only + internal static Timespec NativeInfFuture + { + get + { + return Native.gprsharp_inf_future(ClockType.Realtime); + } + } + + // for tests only + public static Timespec NativeInfPast + { + get + { + return Native.gprsharp_inf_past(ClockType.Realtime); + } + } } } diff --git a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs index bde74945fb..370fa98687 100644 --- a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs +++ b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs @@ -16,6 +16,12 @@ using System.Runtime.CompilerServices; "0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" + "27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" + "71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")] +[assembly: InternalsVisibleTo("Grpc.IntegrationTesting,PublicKey=" + + "00240000048000009400000006020000002400005253413100040000010001002f5797a92c6fcde81bd4098f43" + + "0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" + + "27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" + + "71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")] #else [assembly: InternalsVisibleTo("Grpc.Core.Tests")] +[assembly: InternalsVisibleTo("Grpc.IntegrationTesting")] #endif diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index d538a4671f..069185e13a 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -34,8 +34,7 @@ using System; using System.Collections; using System.Collections.Generic; -using System.Diagnostics; -using System.Runtime.InteropServices; +using System.Linq; using System.Threading.Tasks; using Grpc.Core.Internal; using Grpc.Core.Logging; @@ -48,7 +47,7 @@ namespace Grpc.Core /// </summary> public class Server { - const int InitialAllowRpcTokenCount = 10; + const int InitialAllowRpcTokenCountPerCq = 10; static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); readonly AtomicCounter activeCallCounter = new AtomicCounter(); @@ -80,7 +79,12 @@ namespace Grpc.Core this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { - this.handle = ServerSafeHandle.NewServer(environment.CompletionQueue, channelArgs); + this.handle = ServerSafeHandle.NewServer(channelArgs); + } + + foreach (var cq in environment.CompletionQueues) + { + this.handle.RegisterCompletionQueue(cq); } } @@ -133,9 +137,12 @@ namespace Grpc.Core // Starting with more than one AllowOneRpc tokens can significantly increase // unary RPC throughput. - for (int i = 0; i < InitialAllowRpcTokenCount; i++) + for (int i = 0; i < InitialAllowRpcTokenCountPerCq; i++) { - AllowOneRpc(); + foreach (var cq in environment.CompletionQueues) + { + AllowOneRpc(cq); + } } } } @@ -154,7 +161,8 @@ namespace Grpc.Core shutdownRequested = true; } - handle.ShutdownAndNotify(HandleServerShutdown, environment); + var cq = environment.CompletionQueues.First(); // any cq will do + handle.ShutdownAndNotify(HandleServerShutdown, cq); await shutdownTcs.Task.ConfigureAwait(false); DisposeHandle(); @@ -174,7 +182,8 @@ namespace Grpc.Core shutdownRequested = true; } - handle.ShutdownAndNotify(HandleServerShutdown, environment); + var cq = environment.CompletionQueues.First(); // any cq will do + handle.ShutdownAndNotify(HandleServerShutdown, cq); handle.CancelAllCalls(); await shutdownTcs.Task.ConfigureAwait(false); DisposeHandle(); @@ -244,11 +253,11 @@ namespace Grpc.Core /// <summary> /// Allows one new RPC call to be received by server. /// </summary> - private void AllowOneRpc() + private void AllowOneRpc(CompletionQueueSafeHandle cq) { if (!shutdownRequested) { - handle.RequestCall(HandleNewServerRpc, environment); + handle.RequestCall((success, ctx) => HandleNewServerRpc(success, ctx, cq), cq); } } @@ -265,7 +274,7 @@ namespace Grpc.Core /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> - private async Task HandleCallAsync(ServerRpcNew newRpc) + private async Task HandleCallAsync(ServerRpcNew newRpc, CompletionQueueSafeHandle cq) { try { @@ -274,7 +283,7 @@ namespace Grpc.Core { callHandler = NoSuchMethodCallHandler.Instance; } - await callHandler.HandleCall(newRpc, environment).ConfigureAwait(false); + await callHandler.HandleCall(newRpc, cq).ConfigureAwait(false); } catch (Exception e) { @@ -285,9 +294,9 @@ namespace Grpc.Core /// <summary> /// Handles the native callback. /// </summary> - private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx) + private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx, CompletionQueueSafeHandle cq) { - Task.Run(() => AllowOneRpc()); + Task.Run(() => AllowOneRpc(cq)); if (success) { @@ -296,7 +305,7 @@ namespace Grpc.Core // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) { - HandleCallAsync(newRpc); // we don't need to await. + HandleCallAsync(newRpc, cq); // we don't need to await. } } } diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj index cfe668b6be..3fd28c6528 100644 --- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj +++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj @@ -37,7 +37,7 @@ <ItemGroup> <Reference Include="System" /> <Reference Include="Google.Protobuf"> - <HintPath>..\packages\Google.Protobuf.3.0.0-beta2\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> + <HintPath>..\packages\Google.Protobuf.3.0.0-beta3\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> </Reference> <Reference Include="nunit.framework"> <HintPath>..\packages\NUnit.3.2.0\lib\net45\nunit.framework.dll</HintPath> diff --git a/src/csharp/Grpc.Examples.Tests/packages.config b/src/csharp/Grpc.Examples.Tests/packages.config index ce030f9d77..668601af8e 100644 --- a/src/csharp/Grpc.Examples.Tests/packages.config +++ b/src/csharp/Grpc.Examples.Tests/packages.config @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="utf-8"?> <packages> - <package id="Google.Protobuf" version="3.0.0-beta2" targetFramework="net45" /> + <package id="Google.Protobuf" version="3.0.0-beta3" targetFramework="net45" /> <package id="Ix-Async" version="1.2.5" targetFramework="net45" /> <package id="NUnit" version="3.2.0" targetFramework="net45" /> <package id="NUnitLite" version="3.2.0" targetFramework="net45" /> diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj index f0a0aa3a26..30170ab03c 100644 --- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj +++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj @@ -39,7 +39,7 @@ <ItemGroup> <Reference Include="Google.Protobuf, Version=3.0.0.0, Culture=neutral, PublicKeyToken=a7d26565bac4d604, processorArchitecture=MSIL"> <SpecificVersion>False</SpecificVersion> - <HintPath>..\packages\Google.Protobuf.3.0.0-beta2\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> + <HintPath>..\packages\Google.Protobuf.3.0.0-beta3\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> </Reference> <Reference Include="nunit.framework"> <HintPath>..\packages\NUnit.3.2.0\lib\net45\nunit.framework.dll</HintPath> diff --git a/src/csharp/Grpc.Examples/Math.cs b/src/csharp/Grpc.Examples/Math.cs index 33c4f8d9c0..a17228c8c5 100644 --- a/src/csharp/Grpc.Examples/Math.cs +++ b/src/csharp/Grpc.Examples/Math.cs @@ -34,12 +34,12 @@ namespace Math { "Mw==")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, - new pbr::GeneratedCodeInfo(null, new pbr::GeneratedCodeInfo[] { - new pbr::GeneratedCodeInfo(typeof(global::Math.DivArgs), global::Math.DivArgs.Parser, new[]{ "Dividend", "Divisor" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Math.DivReply), global::Math.DivReply.Parser, new[]{ "Quotient", "Remainder" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Math.FibArgs), global::Math.FibArgs.Parser, new[]{ "Limit" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Math.Num), global::Math.Num.Parser, new[]{ "Num_" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Math.FibReply), global::Math.FibReply.Parser, new[]{ "Count" }, null, null, null) + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Math.DivArgs), global::Math.DivArgs.Parser, new[]{ "Dividend", "Divisor" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Math.DivReply), global::Math.DivReply.Parser, new[]{ "Quotient", "Remainder" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Math.FibArgs), global::Math.FibArgs.Parser, new[]{ "Limit" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Math.Num), global::Math.Num.Parser, new[]{ "Num_" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Math.FibReply), global::Math.FibReply.Parser, new[]{ "Count" }, null, null, null) })); } #endregion diff --git a/src/csharp/Grpc.Examples/packages.config b/src/csharp/Grpc.Examples/packages.config index a424cd2ea0..a70dcbd4c6 100644 --- a/src/csharp/Grpc.Examples/packages.config +++ b/src/csharp/Grpc.Examples/packages.config @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="utf-8"?> <packages> - <package id="Google.Protobuf" version="3.0.0-beta2" targetFramework="net45" /> + <package id="Google.Protobuf" version="3.0.0-beta3" targetFramework="net45" /> <package id="Ix-Async" version="1.2.5" targetFramework="net45" /> <package id="NUnit" version="3.2.0" targetFramework="net45" /> </packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj b/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj index 0bea9c03e7..a5ee4fdb46 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj +++ b/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj @@ -45,7 +45,7 @@ <Reference Include="System.Data" /> <Reference Include="System.Xml" /> <Reference Include="Google.Protobuf"> - <HintPath>..\packages\Google.Protobuf.3.0.0-beta2\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> + <HintPath>..\packages\Google.Protobuf.3.0.0-beta3\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> </Reference> <Reference Include="nunit.framework"> <HintPath>..\packages\NUnit.3.2.0\lib\net45\nunit.framework.dll</HintPath> diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs index fb292945a6..070674bae9 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs @@ -79,16 +79,17 @@ namespace Grpc.HealthCheck.Tests [Test] public void ServiceIsRunning() { - serviceImpl.SetStatus("", HealthCheckResponse.Types.ServingStatus.SERVING); + serviceImpl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Serving); var response = client.Check(new HealthCheckRequest { Service = "" }); - Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.SERVING, response.Status); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.Serving, response.Status); } [Test] public void ServiceDoesntExist() { - Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => client.Check(new HealthCheckRequest { Service = "nonexistent.service" })); + var ex = Assert.Throws<RpcException>(() => client.Check(new HealthCheckRequest { Service = "nonexistent.service" })); + Assert.AreEqual(StatusCode.NotFound, ex.Status.StatusCode); } // TODO(jtattermusch): add test with timeout once timeouts are supported diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs index a4b79e3a7d..15703604ba 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthServiceImplTest.cs @@ -50,38 +50,39 @@ namespace Grpc.HealthCheck.Tests public void SetStatus() { var impl = new HealthServiceImpl(); - impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.SERVING); - Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.SERVING, GetStatusHelper(impl, "")); + impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Serving); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.Serving, GetStatusHelper(impl, "")); - impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.NOT_SERVING); - Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.NOT_SERVING, GetStatusHelper(impl, "")); + impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.NotServing); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.NotServing, GetStatusHelper(impl, "")); - impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.UNKNOWN); - Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.UNKNOWN, GetStatusHelper(impl, "")); + impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Unknown); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.Unknown, GetStatusHelper(impl, "")); - impl.SetStatus("grpc.test.TestService", HealthCheckResponse.Types.ServingStatus.SERVING); - Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.SERVING, GetStatusHelper(impl, "grpc.test.TestService")); + impl.SetStatus("grpc.test.TestService", HealthCheckResponse.Types.ServingStatus.Serving); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.Serving, GetStatusHelper(impl, "grpc.test.TestService")); } [Test] public void ClearStatus() { var impl = new HealthServiceImpl(); - impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.SERVING); - impl.SetStatus("grpc.test.TestService", HealthCheckResponse.Types.ServingStatus.UNKNOWN); + impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Serving); + impl.SetStatus("grpc.test.TestService", HealthCheckResponse.Types.ServingStatus.Unknown); impl.ClearStatus(""); - Assert.Throws(Is.TypeOf(typeof(RpcException)).And.Property("Status").Property("StatusCode").EqualTo(StatusCode.NotFound), () => GetStatusHelper(impl, "")); - Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.UNKNOWN, GetStatusHelper(impl, "grpc.test.TestService")); + var ex = Assert.Throws<RpcException>(() => GetStatusHelper(impl, "")); + Assert.AreEqual(StatusCode.NotFound, ex.Status.StatusCode); + Assert.AreEqual(HealthCheckResponse.Types.ServingStatus.Unknown, GetStatusHelper(impl, "grpc.test.TestService")); } [Test] public void ClearAll() { var impl = new HealthServiceImpl(); - impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.SERVING); - impl.SetStatus("grpc.test.TestService", HealthCheckResponse.Types.ServingStatus.UNKNOWN); + impl.SetStatus("", HealthCheckResponse.Types.ServingStatus.Serving); + impl.SetStatus("grpc.test.TestService", HealthCheckResponse.Types.ServingStatus.Unknown); impl.ClearAll(); Assert.Throws(typeof(RpcException), () => GetStatusHelper(impl, "")); @@ -92,7 +93,7 @@ namespace Grpc.HealthCheck.Tests public void NullsRejected() { var impl = new HealthServiceImpl(); - Assert.Throws(typeof(ArgumentNullException), () => impl.SetStatus(null, HealthCheckResponse.Types.ServingStatus.SERVING)); + Assert.Throws(typeof(ArgumentNullException), () => impl.SetStatus(null, HealthCheckResponse.Types.ServingStatus.Serving)); Assert.Throws(typeof(ArgumentNullException), () => impl.ClearStatus(null)); } diff --git a/src/csharp/Grpc.HealthCheck.Tests/packages.config b/src/csharp/Grpc.HealthCheck.Tests/packages.config index 8066d8fceb..2bcfec8829 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/packages.config +++ b/src/csharp/Grpc.HealthCheck.Tests/packages.config @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="utf-8"?> <packages> - <package id="Google.Protobuf" version="3.0.0-beta2" targetFramework="net45" /> + <package id="Google.Protobuf" version="3.0.0-beta3" targetFramework="net45" /> <package id="NUnit" version="3.2.0" targetFramework="net45" /> <package id="NUnitLite" version="3.2.0" targetFramework="net45" /> </packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj b/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj index 498528aa18..2697b74f59 100644 --- a/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj +++ b/src/csharp/Grpc.HealthCheck/Grpc.HealthCheck.csproj @@ -40,7 +40,7 @@ <ItemGroup> <Reference Include="Google.Protobuf, Version=3.0.0.0, Culture=neutral, PublicKeyToken=a7d26565bac4d604, processorArchitecture=MSIL"> <SpecificVersion>False</SpecificVersion> - <HintPath>..\packages\Google.Protobuf.3.0.0-beta2\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> + <HintPath>..\packages\Google.Protobuf.3.0.0-beta3\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> </Reference> <Reference Include="System" /> <Reference Include="System.Core" /> diff --git a/src/csharp/Grpc.HealthCheck/Health.cs b/src/csharp/Grpc.HealthCheck/Health.cs index d0d0c0b519..100ad187d7 100644 --- a/src/csharp/Grpc.HealthCheck/Health.cs +++ b/src/csharp/Grpc.HealthCheck/Health.cs @@ -33,9 +33,9 @@ namespace Grpc.Health.V1 { "Ag5HcnBjLkhlYWx0aC5WMWIGcHJvdG8z")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, - new pbr::GeneratedCodeInfo(null, new pbr::GeneratedCodeInfo[] { - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Health.V1.HealthCheckRequest), global::Grpc.Health.V1.HealthCheckRequest.Parser, new[]{ "Service" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Health.V1.HealthCheckResponse), global::Grpc.Health.V1.HealthCheckResponse.Parser, new[]{ "Status" }, null, new[]{ typeof(global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus) }, null) + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Health.V1.HealthCheckRequest), global::Grpc.Health.V1.HealthCheckRequest.Parser, new[]{ "Service" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Health.V1.HealthCheckResponse), global::Grpc.Health.V1.HealthCheckResponse.Parser, new[]{ "Status" }, null, new[]{ typeof(global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus) }, null) })); } #endregion @@ -75,7 +75,7 @@ namespace Grpc.Health.V1 { public string Service { get { return service_; } set { - service_ = pb::Preconditions.CheckNotNull(value, "value"); + service_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } @@ -174,7 +174,7 @@ namespace Grpc.Health.V1 { /// <summary>Field number for the "status" field.</summary> public const int StatusFieldNumber = 1; - private global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus status_ = global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.UNKNOWN; + private global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus status_ = 0; public global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus Status { get { return status_; } set { @@ -199,7 +199,7 @@ namespace Grpc.Health.V1 { public override int GetHashCode() { int hash = 1; - if (Status != global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.UNKNOWN) hash ^= Status.GetHashCode(); + if (Status != 0) hash ^= Status.GetHashCode(); return hash; } @@ -208,7 +208,7 @@ namespace Grpc.Health.V1 { } public void WriteTo(pb::CodedOutputStream output) { - if (Status != global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.UNKNOWN) { + if (Status != 0) { output.WriteRawTag(8); output.WriteEnum((int) Status); } @@ -216,7 +216,7 @@ namespace Grpc.Health.V1 { public int CalculateSize() { int size = 0; - if (Status != global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.UNKNOWN) { + if (Status != 0) { size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) Status); } return size; @@ -226,7 +226,7 @@ namespace Grpc.Health.V1 { if (other == null) { return; } - if (other.Status != global::Grpc.Health.V1.HealthCheckResponse.Types.ServingStatus.UNKNOWN) { + if (other.Status != 0) { Status = other.Status; } } @@ -251,9 +251,9 @@ namespace Grpc.Health.V1 { [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] public static partial class Types { public enum ServingStatus { - UNKNOWN = 0, - SERVING = 1, - NOT_SERVING = 2, + [pbr::OriginalName("UNKNOWN")] Unknown = 0, + [pbr::OriginalName("SERVING")] Serving = 1, + [pbr::OriginalName("NOT_SERVING")] NotServing = 2, } } diff --git a/src/csharp/Grpc.HealthCheck/packages.config b/src/csharp/Grpc.HealthCheck/packages.config index 358a978ba9..a52d9e508f 100644 --- a/src/csharp/Grpc.HealthCheck/packages.config +++ b/src/csharp/Grpc.HealthCheck/packages.config @@ -1,5 +1,5 @@ <?xml version="1.0" encoding="utf-8"?> <packages> - <package id="Google.Protobuf" version="3.0.0-beta2" targetFramework="net45" /> + <package id="Google.Protobuf" version="3.0.0-beta3" targetFramework="net45" /> <package id="Ix-Async" version="1.2.5" targetFramework="net45" /> </packages>
\ No newline at end of file diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs index 9eaf6bf7ce..39b9ae08e6 100644 --- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.IO; @@ -41,7 +42,9 @@ using System.Threading; using System.Threading.Tasks; using Google.Protobuf; using Grpc.Core; +using Grpc.Core.Internal; using Grpc.Core.Logging; +using Grpc.Core.Profiling; using Grpc.Core.Utils; using NUnit.Framework; using Grpc.Testing; @@ -55,6 +58,15 @@ namespace Grpc.IntegrationTesting { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>(); + // Profilers to use for clients. + static readonly BlockingCollection<BasicProfiler> profilers = new BlockingCollection<BasicProfiler>(); + + internal static void AddProfiler(BasicProfiler profiler) + { + GrpcPreconditions.CheckNotNull(profiler); + profilers.Add(profiler); + } + /// <summary> /// Creates a started client runner. /// </summary> @@ -83,7 +95,8 @@ namespace Grpc.IntegrationTesting config.OutstandingRpcsPerChannel, config.LoadParams, config.PayloadConfig, - config.HistogramParams); + config.HistogramParams, + () => GetNextProfiler()); } private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams) @@ -110,9 +123,16 @@ namespace Grpc.IntegrationTesting } return result; } + + private static BasicProfiler GetNextProfiler() + { + BasicProfiler result = null; + profilers.TryTake(out result); + return result; + } } - public class ClientRunnerImpl : IClientRunner + internal class ClientRunnerImpl : IClientRunner { const double SecondsToNanos = 1e9; @@ -125,8 +145,9 @@ namespace Grpc.IntegrationTesting readonly List<Task> runnerTasks; readonly CancellationTokenSource stoppedCts = new CancellationTokenSource(); readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); + readonly AtomicCounter statsResetCount = new AtomicCounter(); - public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams) + public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams, Func<BasicProfiler> profilerFactory) { GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel"); GrpcPreconditions.CheckNotNull(histogramParams, "histogramParams"); @@ -142,7 +163,8 @@ namespace Grpc.IntegrationTesting for (int i = 0; i < outstandingRpcsPerChannel; i++) { var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel); - this.runnerTasks.Add(RunClientAsync(channel, timer)); + var optionalProfiler = profilerFactory(); + this.runnerTasks.Add(RunClientAsync(channel, timer, optionalProfiler)); } } } @@ -152,6 +174,11 @@ namespace Grpc.IntegrationTesting var histogramData = histogram.GetSnapshot(reset); var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds; + if (reset) + { + statsResetCount.Increment(); + } + // TODO: populate user time and system time return new ClientStats { @@ -175,14 +202,28 @@ namespace Grpc.IntegrationTesting } } - private void RunUnary(Channel channel, IInterarrivalTimer timer) + private void RunUnary(Channel channel, IInterarrivalTimer timer, BasicProfiler optionalProfiler) { + if (optionalProfiler != null) + { + Profilers.SetForCurrentThread(optionalProfiler); + } + + bool profilerReset = false; + var client = BenchmarkService.NewClient(channel); var request = CreateSimpleRequest(); var stopwatch = new Stopwatch(); while (!stoppedCts.Token.IsCancellationRequested) { + // after the first stats reset, also reset the profiler. + if (optionalProfiler != null && !profilerReset && statsResetCount.Count > 0) + { + optionalProfiler.Reset(); + profilerReset = true; + } + stopwatch.Restart(); client.UnaryCall(request); stopwatch.Stop(); @@ -268,29 +309,29 @@ namespace Grpc.IntegrationTesting } } - private Task RunClientAsync(Channel channel, IInterarrivalTimer timer) + private Task RunClientAsync(Channel channel, IInterarrivalTimer timer, BasicProfiler optionalProfiler) { if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams) { - GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API"); - GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls"); + GrpcPreconditions.CheckArgument(clientType == ClientType.AsyncClient, "Generic client only supports async API"); + GrpcPreconditions.CheckArgument(rpcType == RpcType.Streaming, "Generic client only supports streaming calls"); return RunGenericStreamingAsync(channel, timer); } GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams); - if (clientType == ClientType.SYNC_CLIENT) + if (clientType == ClientType.SyncClient) { - GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#"); + GrpcPreconditions.CheckArgument(rpcType == RpcType.Unary, "Sync client can only be used for Unary calls in C#"); // create a dedicated thread for the synchronous client - return Task.Factory.StartNew(() => RunUnary(channel, timer), TaskCreationOptions.LongRunning); + return Task.Factory.StartNew(() => RunUnary(channel, timer, optionalProfiler), TaskCreationOptions.LongRunning); } - else if (clientType == ClientType.ASYNC_CLIENT) + else if (clientType == ClientType.AsyncClient) { switch (rpcType) { - case RpcType.UNARY: + case RpcType.Unary: return RunUnaryAsync(channel, timer); - case RpcType.STREAMING: + case RpcType.Streaming: return RunStreamingPingPongAsync(channel, timer); } } diff --git a/src/csharp/Grpc.IntegrationTesting/Control.cs b/src/csharp/Grpc.IntegrationTesting/Control.cs index 3fa8d43f38..412f800ff9 100644 --- a/src/csharp/Grpc.IntegrationTesting/Control.cs +++ b/src/csharp/Grpc.IntegrationTesting/Control.cs @@ -85,25 +85,25 @@ namespace Grpc.Testing { "RUFNSU5HEAFiBnByb3RvMw==")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Grpc.Testing.PayloadsReflection.Descriptor, global::Grpc.Testing.StatsReflection.Descriptor, }, - new pbr::GeneratedCodeInfo(new[] {typeof(global::Grpc.Testing.ClientType), typeof(global::Grpc.Testing.ServerType), typeof(global::Grpc.Testing.RpcType), }, new pbr::GeneratedCodeInfo[] { - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.PoissonParams), global::Grpc.Testing.PoissonParams.Parser, new[]{ "OfferedLoad" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClosedLoopParams), global::Grpc.Testing.ClosedLoopParams.Parser, null, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.LoadParams), global::Grpc.Testing.LoadParams.Parser, new[]{ "ClosedLoop", "Poisson" }, new[]{ "Load" }, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.SecurityParams), global::Grpc.Testing.SecurityParams.Parser, new[]{ "UseTestCa", "ServerHostOverride" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClientConfig), global::Grpc.Testing.ClientConfig.Parser, new[]{ "ServerTargets", "ClientType", "SecurityParams", "OutstandingRpcsPerChannel", "ClientChannels", "AsyncClientThreads", "RpcType", "LoadParams", "PayloadConfig", "HistogramParams", "CoreList", "CoreLimit", "OtherClientApi" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClientStatus), global::Grpc.Testing.ClientStatus.Parser, new[]{ "Stats" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.Mark), global::Grpc.Testing.Mark.Parser, new[]{ "Reset" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClientArgs), global::Grpc.Testing.ClientArgs.Parser, new[]{ "Setup", "Mark" }, new[]{ "Argtype" }, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ServerConfig), global::Grpc.Testing.ServerConfig.Parser, new[]{ "ServerType", "SecurityParams", "Port", "AsyncServerThreads", "CoreLimit", "PayloadConfig", "CoreList", "OtherServerApi" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ServerArgs), global::Grpc.Testing.ServerArgs.Parser, new[]{ "Setup", "Mark" }, new[]{ "Argtype" }, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ServerStatus), global::Grpc.Testing.ServerStatus.Parser, new[]{ "Stats", "Port", "Cores" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.CoreRequest), global::Grpc.Testing.CoreRequest.Parser, null, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.CoreResponse), global::Grpc.Testing.CoreResponse.Parser, new[]{ "Cores" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.Void), global::Grpc.Testing.Void.Parser, null, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.Scenario), global::Grpc.Testing.Scenario.Parser, new[]{ "Name", "ClientConfig", "NumClients", "ServerConfig", "NumServers", "WarmupSeconds", "BenchmarkSeconds", "SpawnLocalWorkerCount" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.Scenarios), global::Grpc.Testing.Scenarios.Parser, new[]{ "Scenarios_" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ScenarioResultSummary), global::Grpc.Testing.ScenarioResultSummary.Parser, new[]{ "Qps", "QpsPerServerCore", "ServerSystemTime", "ServerUserTime", "ClientSystemTime", "ClientUserTime", "Latency50", "Latency90", "Latency95", "Latency99", "Latency999" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ScenarioResult), global::Grpc.Testing.ScenarioResult.Parser, new[]{ "Scenario", "Latencies", "ClientStats", "ServerStats", "ServerCores", "Summary" }, null, null, null) + new pbr::GeneratedClrTypeInfo(new[] {typeof(global::Grpc.Testing.ClientType), typeof(global::Grpc.Testing.ServerType), typeof(global::Grpc.Testing.RpcType), }, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.PoissonParams), global::Grpc.Testing.PoissonParams.Parser, new[]{ "OfferedLoad" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ClosedLoopParams), global::Grpc.Testing.ClosedLoopParams.Parser, null, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.LoadParams), global::Grpc.Testing.LoadParams.Parser, new[]{ "ClosedLoop", "Poisson" }, new[]{ "Load" }, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.SecurityParams), global::Grpc.Testing.SecurityParams.Parser, new[]{ "UseTestCa", "ServerHostOverride" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ClientConfig), global::Grpc.Testing.ClientConfig.Parser, new[]{ "ServerTargets", "ClientType", "SecurityParams", "OutstandingRpcsPerChannel", "ClientChannels", "AsyncClientThreads", "RpcType", "LoadParams", "PayloadConfig", "HistogramParams", "CoreList", "CoreLimit", "OtherClientApi" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ClientStatus), global::Grpc.Testing.ClientStatus.Parser, new[]{ "Stats" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.Mark), global::Grpc.Testing.Mark.Parser, new[]{ "Reset" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ClientArgs), global::Grpc.Testing.ClientArgs.Parser, new[]{ "Setup", "Mark" }, new[]{ "Argtype" }, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ServerConfig), global::Grpc.Testing.ServerConfig.Parser, new[]{ "ServerType", "SecurityParams", "Port", "AsyncServerThreads", "CoreLimit", "PayloadConfig", "CoreList", "OtherServerApi" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ServerArgs), global::Grpc.Testing.ServerArgs.Parser, new[]{ "Setup", "Mark" }, new[]{ "Argtype" }, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ServerStatus), global::Grpc.Testing.ServerStatus.Parser, new[]{ "Stats", "Port", "Cores" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.CoreRequest), global::Grpc.Testing.CoreRequest.Parser, null, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.CoreResponse), global::Grpc.Testing.CoreResponse.Parser, new[]{ "Cores" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.Void), global::Grpc.Testing.Void.Parser, null, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.Scenario), global::Grpc.Testing.Scenario.Parser, new[]{ "Name", "ClientConfig", "NumClients", "ServerConfig", "NumServers", "WarmupSeconds", "BenchmarkSeconds", "SpawnLocalWorkerCount" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.Scenarios), global::Grpc.Testing.Scenarios.Parser, new[]{ "Scenarios_" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ScenarioResultSummary), global::Grpc.Testing.ScenarioResultSummary.Parser, new[]{ "Qps", "QpsPerServerCore", "ServerSystemTime", "ServerUserTime", "ClientSystemTime", "ClientUserTime", "Latency50", "Latency90", "Latency95", "Latency99", "Latency999" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ScenarioResult), global::Grpc.Testing.ScenarioResult.Parser, new[]{ "Scenario", "Latencies", "ClientStats", "ServerStats", "ServerCores", "Summary" }, null, null, null) })); } #endregion @@ -115,27 +115,27 @@ namespace Grpc.Testing { /// Many languages support a basic distinction between using /// sync or async client, and this allows the specification /// </summary> - SYNC_CLIENT = 0, - ASYNC_CLIENT = 1, + [pbr::OriginalName("SYNC_CLIENT")] SyncClient = 0, + [pbr::OriginalName("ASYNC_CLIENT")] AsyncClient = 1, /// <summary> /// used for some language-specific variants /// </summary> - OTHER_CLIENT = 2, + [pbr::OriginalName("OTHER_CLIENT")] OtherClient = 2, } public enum ServerType { - SYNC_SERVER = 0, - ASYNC_SERVER = 1, - ASYNC_GENERIC_SERVER = 2, + [pbr::OriginalName("SYNC_SERVER")] SyncServer = 0, + [pbr::OriginalName("ASYNC_SERVER")] AsyncServer = 1, + [pbr::OriginalName("ASYNC_GENERIC_SERVER")] AsyncGenericServer = 2, /// <summary> /// used for some language-specific variants /// </summary> - OTHER_SERVER = 3, + [pbr::OriginalName("OTHER_SERVER")] OtherServer = 3, } public enum RpcType { - UNARY = 0, - STREAMING = 1, + [pbr::OriginalName("UNARY")] Unary = 0, + [pbr::OriginalName("STREAMING")] Streaming = 1, } #endregion @@ -547,7 +547,7 @@ namespace Grpc.Testing { public string ServerHostOverride { get { return serverHostOverride_; } set { - serverHostOverride_ = pb::Preconditions.CheckNotNull(value, "value"); + serverHostOverride_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } @@ -686,7 +686,7 @@ namespace Grpc.Testing { /// <summary>Field number for the "client_type" field.</summary> public const int ClientTypeFieldNumber = 2; - private global::Grpc.Testing.ClientType clientType_ = global::Grpc.Testing.ClientType.SYNC_CLIENT; + private global::Grpc.Testing.ClientType clientType_ = 0; public global::Grpc.Testing.ClientType ClientType { get { return clientType_; } set { @@ -747,7 +747,7 @@ namespace Grpc.Testing { /// <summary>Field number for the "rpc_type" field.</summary> public const int RpcTypeFieldNumber = 8; - private global::Grpc.Testing.RpcType rpcType_ = global::Grpc.Testing.RpcType.UNARY; + private global::Grpc.Testing.RpcType rpcType_ = 0; public global::Grpc.Testing.RpcType RpcType { get { return rpcType_; } set { @@ -819,7 +819,7 @@ namespace Grpc.Testing { public string OtherClientApi { get { return otherClientApi_; } set { - otherClientApi_ = pb::Preconditions.CheckNotNull(value, "value"); + otherClientApi_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } @@ -853,12 +853,12 @@ namespace Grpc.Testing { public override int GetHashCode() { int hash = 1; hash ^= serverTargets_.GetHashCode(); - if (ClientType != global::Grpc.Testing.ClientType.SYNC_CLIENT) hash ^= ClientType.GetHashCode(); + if (ClientType != 0) hash ^= ClientType.GetHashCode(); if (securityParams_ != null) hash ^= SecurityParams.GetHashCode(); if (OutstandingRpcsPerChannel != 0) hash ^= OutstandingRpcsPerChannel.GetHashCode(); if (ClientChannels != 0) hash ^= ClientChannels.GetHashCode(); if (AsyncClientThreads != 0) hash ^= AsyncClientThreads.GetHashCode(); - if (RpcType != global::Grpc.Testing.RpcType.UNARY) hash ^= RpcType.GetHashCode(); + if (RpcType != 0) hash ^= RpcType.GetHashCode(); if (loadParams_ != null) hash ^= LoadParams.GetHashCode(); if (payloadConfig_ != null) hash ^= PayloadConfig.GetHashCode(); if (histogramParams_ != null) hash ^= HistogramParams.GetHashCode(); @@ -874,7 +874,7 @@ namespace Grpc.Testing { public void WriteTo(pb::CodedOutputStream output) { serverTargets_.WriteTo(output, _repeated_serverTargets_codec); - if (ClientType != global::Grpc.Testing.ClientType.SYNC_CLIENT) { + if (ClientType != 0) { output.WriteRawTag(16); output.WriteEnum((int) ClientType); } @@ -894,7 +894,7 @@ namespace Grpc.Testing { output.WriteRawTag(56); output.WriteInt32(AsyncClientThreads); } - if (RpcType != global::Grpc.Testing.RpcType.UNARY) { + if (RpcType != 0) { output.WriteRawTag(64); output.WriteEnum((int) RpcType); } @@ -924,7 +924,7 @@ namespace Grpc.Testing { public int CalculateSize() { int size = 0; size += serverTargets_.CalculateSize(_repeated_serverTargets_codec); - if (ClientType != global::Grpc.Testing.ClientType.SYNC_CLIENT) { + if (ClientType != 0) { size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) ClientType); } if (securityParams_ != null) { @@ -939,7 +939,7 @@ namespace Grpc.Testing { if (AsyncClientThreads != 0) { size += 1 + pb::CodedOutputStream.ComputeInt32Size(AsyncClientThreads); } - if (RpcType != global::Grpc.Testing.RpcType.UNARY) { + if (RpcType != 0) { size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) RpcType); } if (loadParams_ != null) { @@ -966,7 +966,7 @@ namespace Grpc.Testing { return; } serverTargets_.Add(other.serverTargets_); - if (other.ClientType != global::Grpc.Testing.ClientType.SYNC_CLIENT) { + if (other.ClientType != 0) { ClientType = other.ClientType; } if (other.securityParams_ != null) { @@ -984,7 +984,7 @@ namespace Grpc.Testing { if (other.AsyncClientThreads != 0) { AsyncClientThreads = other.AsyncClientThreads; } - if (other.RpcType != global::Grpc.Testing.RpcType.UNARY) { + if (other.RpcType != 0) { RpcType = other.RpcType; } if (other.loadParams_ != null) { @@ -1515,7 +1515,7 @@ namespace Grpc.Testing { /// <summary>Field number for the "server_type" field.</summary> public const int ServerTypeFieldNumber = 1; - private global::Grpc.Testing.ServerType serverType_ = global::Grpc.Testing.ServerType.SYNC_SERVER; + private global::Grpc.Testing.ServerType serverType_ = 0; public global::Grpc.Testing.ServerType ServerType { get { return serverType_; } set { @@ -1606,7 +1606,7 @@ namespace Grpc.Testing { public string OtherServerApi { get { return otherServerApi_; } set { - otherServerApi_ = pb::Preconditions.CheckNotNull(value, "value"); + otherServerApi_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } @@ -1634,7 +1634,7 @@ namespace Grpc.Testing { public override int GetHashCode() { int hash = 1; - if (ServerType != global::Grpc.Testing.ServerType.SYNC_SERVER) hash ^= ServerType.GetHashCode(); + if (ServerType != 0) hash ^= ServerType.GetHashCode(); if (securityParams_ != null) hash ^= SecurityParams.GetHashCode(); if (Port != 0) hash ^= Port.GetHashCode(); if (AsyncServerThreads != 0) hash ^= AsyncServerThreads.GetHashCode(); @@ -1650,7 +1650,7 @@ namespace Grpc.Testing { } public void WriteTo(pb::CodedOutputStream output) { - if (ServerType != global::Grpc.Testing.ServerType.SYNC_SERVER) { + if (ServerType != 0) { output.WriteRawTag(8); output.WriteEnum((int) ServerType); } @@ -1683,7 +1683,7 @@ namespace Grpc.Testing { public int CalculateSize() { int size = 0; - if (ServerType != global::Grpc.Testing.ServerType.SYNC_SERVER) { + if (ServerType != 0) { size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) ServerType); } if (securityParams_ != null) { @@ -1712,7 +1712,7 @@ namespace Grpc.Testing { if (other == null) { return; } - if (other.ServerType != global::Grpc.Testing.ServerType.SYNC_SERVER) { + if (other.ServerType != 0) { ServerType = other.ServerType; } if (other.securityParams_ != null) { @@ -2436,7 +2436,7 @@ namespace Grpc.Testing { public string Name { get { return name_; } set { - name_ = pb::Preconditions.CheckNotNull(value, "value"); + name_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } diff --git a/src/csharp/Grpc.IntegrationTesting/Empty.cs b/src/csharp/Grpc.IntegrationTesting/Empty.cs index 4323c5a09f..cf1c23fb0f 100644 --- a/src/csharp/Grpc.IntegrationTesting/Empty.cs +++ b/src/csharp/Grpc.IntegrationTesting/Empty.cs @@ -27,8 +27,8 @@ namespace Grpc.Testing { "c3RpbmciBwoFRW1wdHliBnByb3RvMw==")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, - new pbr::GeneratedCodeInfo(null, new pbr::GeneratedCodeInfo[] { - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.Empty), global::Grpc.Testing.Empty.Parser, null, null, null, null) + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.Empty), global::Grpc.Testing.Empty.Parser, null, null, null, null) })); } #endregion diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 9685cf1837..0089049408 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -61,7 +61,7 @@ <HintPath>..\packages\Google.Apis.Core.1.11.1\lib\net45\Google.Apis.Core.dll</HintPath> </Reference> <Reference Include="Google.Protobuf"> - <HintPath>..\packages\Google.Protobuf.3.0.0-beta2\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> + <HintPath>..\packages\Google.Protobuf.3.0.0-beta3\lib\portable-net45+netcore45+wpa81+wp8\Google.Protobuf.dll</HintPath> </Reference> <Reference Include="Newtonsoft.Json"> <HintPath>..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath> diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index cff8508631..aea40afee2 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -230,13 +230,13 @@ namespace Grpc.IntegrationTesting Console.WriteLine("running large_unary"); var request = new SimpleRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseSize = 314159, Payload = CreateZerosPayload(271828) }; var response = client.UnaryCall(request); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(PayloadType.Compressable, response.Payload.Type); Assert.AreEqual(314159, response.Payload.Body.Length); Console.WriteLine("Passed!"); } @@ -265,7 +265,7 @@ namespace Grpc.IntegrationTesting var request = new StreamingOutputCallRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseParameters = { bodySizes.ConvertAll((size) => new ResponseParameters { Size = size }) } }; @@ -274,7 +274,7 @@ namespace Grpc.IntegrationTesting var responseList = await call.ResponseStream.ToListAsync(); foreach (var res in responseList) { - Assert.AreEqual(PayloadType.COMPRESSABLE, res.Payload.Type); + Assert.AreEqual(PayloadType.Compressable, res.Payload.Type); } CollectionAssert.AreEqual(bodySizes, responseList.ConvertAll((item) => item.Payload.Body.Length)); } @@ -289,46 +289,46 @@ namespace Grpc.IntegrationTesting { await call.RequestStream.WriteAsync(new StreamingOutputCallRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseParameters = { new ResponseParameters { Size = 31415 } }, Payload = CreateZerosPayload(27182) }); Assert.IsTrue(await call.ResponseStream.MoveNext()); - Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type); Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); await call.RequestStream.WriteAsync(new StreamingOutputCallRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseParameters = { new ResponseParameters { Size = 9 } }, Payload = CreateZerosPayload(8) }); Assert.IsTrue(await call.ResponseStream.MoveNext()); - Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type); Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length); await call.RequestStream.WriteAsync(new StreamingOutputCallRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseParameters = { new ResponseParameters { Size = 2653 } }, Payload = CreateZerosPayload(1828) }); Assert.IsTrue(await call.ResponseStream.MoveNext()); - Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type); Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length); await call.RequestStream.WriteAsync(new StreamingOutputCallRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseParameters = { new ResponseParameters { Size = 58979 } }, Payload = CreateZerosPayload(45904) }); Assert.IsTrue(await call.ResponseStream.MoveNext()); - Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type); Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length); await call.RequestStream.CompleteAsync(); @@ -357,7 +357,7 @@ namespace Grpc.IntegrationTesting var request = new SimpleRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseSize = 314159, Payload = CreateZerosPayload(271828), FillUsername = true, @@ -367,7 +367,7 @@ namespace Grpc.IntegrationTesting // not setting credentials here because they were set on channel already var response = client.UnaryCall(request); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(PayloadType.Compressable, response.Payload.Type); Assert.AreEqual(314159, response.Payload.Body.Length); Assert.False(string.IsNullOrEmpty(response.OauthScope)); Assert.True(oauthScope.Contains(response.OauthScope)); @@ -381,7 +381,7 @@ namespace Grpc.IntegrationTesting var request = new SimpleRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseSize = 314159, Payload = CreateZerosPayload(271828), FillUsername = true, @@ -390,7 +390,7 @@ namespace Grpc.IntegrationTesting // not setting credentials here because they were set on channel already var response = client.UnaryCall(request); - Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type); + Assert.AreEqual(PayloadType.Compressable, response.Payload.Type); Assert.AreEqual(314159, response.Payload.Body.Length); Assert.AreEqual(GetEmailFromServiceAccountFile(), response.Username); Console.WriteLine("Passed!"); @@ -460,19 +460,27 @@ namespace Grpc.IntegrationTesting { await call.RequestStream.WriteAsync(new StreamingOutputCallRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseParameters = { new ResponseParameters { Size = 31415 } }, Payload = CreateZerosPayload(27182) }); Assert.IsTrue(await call.ResponseStream.MoveNext()); - Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type); + Assert.AreEqual(PayloadType.Compressable, call.ResponseStream.Current.Payload.Type); Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length); cts.Cancel(); - var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext()); - Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseStream.MoveNext(); + Assert.Fail(); + } + catch (RpcException ex) + { + Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode); + } } Console.WriteLine("Passed!"); } @@ -497,9 +505,16 @@ namespace Grpc.IntegrationTesting // Deadline was reached before write has started. Eat the exception and continue. } - var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext()); - // We can't guarantee the status code always DeadlineExceeded. See issue #2685. - Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); + try + { + await call.ResponseStream.MoveNext(); + Assert.Fail(); + } + catch (RpcException ex) + { + // We can't guarantee the status code always DeadlineExceeded. See issue #2685. + Assert.Contains(ex.Status.StatusCode, new[] { StatusCode.DeadlineExceeded, StatusCode.Internal }); + } } Console.WriteLine("Passed!"); } @@ -511,7 +526,7 @@ namespace Grpc.IntegrationTesting // step 1: test unary call var request = new SimpleRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseSize = 314159, Payload = CreateZerosPayload(271828) }; @@ -530,7 +545,7 @@ namespace Grpc.IntegrationTesting // step 2: test full duplex call var request = new StreamingOutputCallRequest { - ResponseType = PayloadType.COMPRESSABLE, + ResponseType = PayloadType.Compressable, ResponseParameters = { new ResponseParameters { Size = 31415 } }, Payload = CreateZerosPayload(27182) }; @@ -577,9 +592,17 @@ namespace Grpc.IntegrationTesting await call.RequestStream.WriteAsync(request); await call.RequestStream.CompleteAsync(); - var e = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.ToListAsync()); - Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); - Assert.AreEqual(echoStatus.Message, e.Status.Detail); + try + { + // cannot use Assert.ThrowsAsync because it uses Task.Wait and would deadlock. + await call.ResponseStream.ToListAsync(); + Assert.Fail(); + } + catch (RpcException e) + { + Assert.AreEqual(StatusCode.Unknown, e.Status.StatusCode); + Assert.AreEqual(echoStatus.Message, e.Status.Detail); + } } Console.WriteLine("Passed!"); diff --git a/src/csharp/Grpc.IntegrationTesting/Messages.cs b/src/csharp/Grpc.IntegrationTesting/Messages.cs index fcff475941..d42501aa5b 100644 --- a/src/csharp/Grpc.IntegrationTesting/Messages.cs +++ b/src/csharp/Grpc.IntegrationTesting/Messages.cs @@ -55,18 +55,18 @@ namespace Grpc.Testing { "TkUQABIICgRHWklQEAESCwoHREVGTEFURRACYgZwcm90bzM=")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, - new pbr::GeneratedCodeInfo(new[] {typeof(global::Grpc.Testing.PayloadType), typeof(global::Grpc.Testing.CompressionType), }, new pbr::GeneratedCodeInfo[] { - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.Payload), global::Grpc.Testing.Payload.Parser, new[]{ "Type", "Body" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.EchoStatus), global::Grpc.Testing.EchoStatus.Parser, new[]{ "Code", "Message" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.SimpleRequest), global::Grpc.Testing.SimpleRequest.Parser, new[]{ "ResponseType", "ResponseSize", "Payload", "FillUsername", "FillOauthScope", "ResponseCompression", "ResponseStatus" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.SimpleResponse), global::Grpc.Testing.SimpleResponse.Parser, new[]{ "Payload", "Username", "OauthScope" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.StreamingInputCallRequest), global::Grpc.Testing.StreamingInputCallRequest.Parser, new[]{ "Payload" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.StreamingInputCallResponse), global::Grpc.Testing.StreamingInputCallResponse.Parser, new[]{ "AggregatedPayloadSize" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ResponseParameters), global::Grpc.Testing.ResponseParameters.Parser, new[]{ "Size", "IntervalUs" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.StreamingOutputCallRequest), global::Grpc.Testing.StreamingOutputCallRequest.Parser, new[]{ "ResponseType", "ResponseParameters", "Payload", "ResponseCompression", "ResponseStatus" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.StreamingOutputCallResponse), global::Grpc.Testing.StreamingOutputCallResponse.Parser, new[]{ "Payload" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ReconnectParams), global::Grpc.Testing.ReconnectParams.Parser, new[]{ "MaxReconnectBackoffMs" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ReconnectInfo), global::Grpc.Testing.ReconnectInfo.Parser, new[]{ "Passed", "BackoffMs" }, null, null, null) + new pbr::GeneratedClrTypeInfo(new[] {typeof(global::Grpc.Testing.PayloadType), typeof(global::Grpc.Testing.CompressionType), }, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.Payload), global::Grpc.Testing.Payload.Parser, new[]{ "Type", "Body" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.EchoStatus), global::Grpc.Testing.EchoStatus.Parser, new[]{ "Code", "Message" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.SimpleRequest), global::Grpc.Testing.SimpleRequest.Parser, new[]{ "ResponseType", "ResponseSize", "Payload", "FillUsername", "FillOauthScope", "ResponseCompression", "ResponseStatus" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.SimpleResponse), global::Grpc.Testing.SimpleResponse.Parser, new[]{ "Payload", "Username", "OauthScope" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.StreamingInputCallRequest), global::Grpc.Testing.StreamingInputCallRequest.Parser, new[]{ "Payload" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.StreamingInputCallResponse), global::Grpc.Testing.StreamingInputCallResponse.Parser, new[]{ "AggregatedPayloadSize" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ResponseParameters), global::Grpc.Testing.ResponseParameters.Parser, new[]{ "Size", "IntervalUs" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.StreamingOutputCallRequest), global::Grpc.Testing.StreamingOutputCallRequest.Parser, new[]{ "ResponseType", "ResponseParameters", "Payload", "ResponseCompression", "ResponseStatus" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.StreamingOutputCallResponse), global::Grpc.Testing.StreamingOutputCallResponse.Parser, new[]{ "Payload" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ReconnectParams), global::Grpc.Testing.ReconnectParams.Parser, new[]{ "MaxReconnectBackoffMs" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ReconnectInfo), global::Grpc.Testing.ReconnectInfo.Parser, new[]{ "Passed", "BackoffMs" }, null, null, null) })); } #endregion @@ -80,15 +80,15 @@ namespace Grpc.Testing { /// <summary> /// Compressable text format. /// </summary> - COMPRESSABLE = 0, + [pbr::OriginalName("COMPRESSABLE")] Compressable = 0, /// <summary> /// Uncompressable binary format. /// </summary> - UNCOMPRESSABLE = 1, + [pbr::OriginalName("UNCOMPRESSABLE")] Uncompressable = 1, /// <summary> /// Randomly chosen from all other formats defined in this enum. /// </summary> - RANDOM = 2, + [pbr::OriginalName("RANDOM")] Random = 2, } /// <summary> @@ -98,9 +98,9 @@ namespace Grpc.Testing { /// <summary> /// No compression /// </summary> - NONE = 0, - GZIP = 1, - DEFLATE = 2, + [pbr::OriginalName("NONE")] None = 0, + [pbr::OriginalName("GZIP")] Gzip = 1, + [pbr::OriginalName("DEFLATE")] Deflate = 2, } #endregion @@ -139,7 +139,7 @@ namespace Grpc.Testing { /// <summary>Field number for the "type" field.</summary> public const int TypeFieldNumber = 1; - private global::Grpc.Testing.PayloadType type_ = global::Grpc.Testing.PayloadType.COMPRESSABLE; + private global::Grpc.Testing.PayloadType type_ = 0; /// <summary> /// The type of data in body. /// </summary> @@ -159,7 +159,7 @@ namespace Grpc.Testing { public pb::ByteString Body { get { return body_; } set { - body_ = pb::Preconditions.CheckNotNull(value, "value"); + body_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } @@ -181,7 +181,7 @@ namespace Grpc.Testing { public override int GetHashCode() { int hash = 1; - if (Type != global::Grpc.Testing.PayloadType.COMPRESSABLE) hash ^= Type.GetHashCode(); + if (Type != 0) hash ^= Type.GetHashCode(); if (Body.Length != 0) hash ^= Body.GetHashCode(); return hash; } @@ -191,7 +191,7 @@ namespace Grpc.Testing { } public void WriteTo(pb::CodedOutputStream output) { - if (Type != global::Grpc.Testing.PayloadType.COMPRESSABLE) { + if (Type != 0) { output.WriteRawTag(8); output.WriteEnum((int) Type); } @@ -203,7 +203,7 @@ namespace Grpc.Testing { public int CalculateSize() { int size = 0; - if (Type != global::Grpc.Testing.PayloadType.COMPRESSABLE) { + if (Type != 0) { size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) Type); } if (Body.Length != 0) { @@ -216,7 +216,7 @@ namespace Grpc.Testing { if (other == null) { return; } - if (other.Type != global::Grpc.Testing.PayloadType.COMPRESSABLE) { + if (other.Type != 0) { Type = other.Type; } if (other.Body.Length != 0) { @@ -293,7 +293,7 @@ namespace Grpc.Testing { public string Message { get { return message_; } set { - message_ = pb::Preconditions.CheckNotNull(value, "value"); + message_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } @@ -417,7 +417,7 @@ namespace Grpc.Testing { /// <summary>Field number for the "response_type" field.</summary> public const int ResponseTypeFieldNumber = 1; - private global::Grpc.Testing.PayloadType responseType_ = global::Grpc.Testing.PayloadType.COMPRESSABLE; + private global::Grpc.Testing.PayloadType responseType_ = 0; /// <summary> /// Desired payload type in the response from the server. /// If response_type is RANDOM, server randomly chooses one from other formats. @@ -484,7 +484,7 @@ namespace Grpc.Testing { /// <summary>Field number for the "response_compression" field.</summary> public const int ResponseCompressionFieldNumber = 6; - private global::Grpc.Testing.CompressionType responseCompression_ = global::Grpc.Testing.CompressionType.NONE; + private global::Grpc.Testing.CompressionType responseCompression_ = 0; /// <summary> /// Compression algorithm to be used by the server for the response (stream) /// </summary> @@ -531,12 +531,12 @@ namespace Grpc.Testing { public override int GetHashCode() { int hash = 1; - if (ResponseType != global::Grpc.Testing.PayloadType.COMPRESSABLE) hash ^= ResponseType.GetHashCode(); + if (ResponseType != 0) hash ^= ResponseType.GetHashCode(); if (ResponseSize != 0) hash ^= ResponseSize.GetHashCode(); if (payload_ != null) hash ^= Payload.GetHashCode(); if (FillUsername != false) hash ^= FillUsername.GetHashCode(); if (FillOauthScope != false) hash ^= FillOauthScope.GetHashCode(); - if (ResponseCompression != global::Grpc.Testing.CompressionType.NONE) hash ^= ResponseCompression.GetHashCode(); + if (ResponseCompression != 0) hash ^= ResponseCompression.GetHashCode(); if (responseStatus_ != null) hash ^= ResponseStatus.GetHashCode(); return hash; } @@ -546,7 +546,7 @@ namespace Grpc.Testing { } public void WriteTo(pb::CodedOutputStream output) { - if (ResponseType != global::Grpc.Testing.PayloadType.COMPRESSABLE) { + if (ResponseType != 0) { output.WriteRawTag(8); output.WriteEnum((int) ResponseType); } @@ -566,7 +566,7 @@ namespace Grpc.Testing { output.WriteRawTag(40); output.WriteBool(FillOauthScope); } - if (ResponseCompression != global::Grpc.Testing.CompressionType.NONE) { + if (ResponseCompression != 0) { output.WriteRawTag(48); output.WriteEnum((int) ResponseCompression); } @@ -578,7 +578,7 @@ namespace Grpc.Testing { public int CalculateSize() { int size = 0; - if (ResponseType != global::Grpc.Testing.PayloadType.COMPRESSABLE) { + if (ResponseType != 0) { size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) ResponseType); } if (ResponseSize != 0) { @@ -593,7 +593,7 @@ namespace Grpc.Testing { if (FillOauthScope != false) { size += 1 + 1; } - if (ResponseCompression != global::Grpc.Testing.CompressionType.NONE) { + if (ResponseCompression != 0) { size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) ResponseCompression); } if (responseStatus_ != null) { @@ -606,7 +606,7 @@ namespace Grpc.Testing { if (other == null) { return; } - if (other.ResponseType != global::Grpc.Testing.PayloadType.COMPRESSABLE) { + if (other.ResponseType != 0) { ResponseType = other.ResponseType; } if (other.ResponseSize != 0) { @@ -624,7 +624,7 @@ namespace Grpc.Testing { if (other.FillOauthScope != false) { FillOauthScope = other.FillOauthScope; } - if (other.ResponseCompression != global::Grpc.Testing.CompressionType.NONE) { + if (other.ResponseCompression != 0) { ResponseCompression = other.ResponseCompression; } if (other.responseStatus_ != null) { @@ -737,7 +737,7 @@ namespace Grpc.Testing { public string Username { get { return username_; } set { - username_ = pb::Preconditions.CheckNotNull(value, "value"); + username_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } @@ -750,7 +750,7 @@ namespace Grpc.Testing { public string OauthScope { get { return oauthScope_; } set { - oauthScope_ = pb::Preconditions.CheckNotNull(value, "value"); + oauthScope_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } @@ -1259,7 +1259,7 @@ namespace Grpc.Testing { /// <summary>Field number for the "response_type" field.</summary> public const int ResponseTypeFieldNumber = 1; - private global::Grpc.Testing.PayloadType responseType_ = global::Grpc.Testing.PayloadType.COMPRESSABLE; + private global::Grpc.Testing.PayloadType responseType_ = 0; /// <summary> /// Desired payload type in the response from the server. /// If response_type is RANDOM, the payload from each response in the stream @@ -1300,7 +1300,7 @@ namespace Grpc.Testing { /// <summary>Field number for the "response_compression" field.</summary> public const int ResponseCompressionFieldNumber = 6; - private global::Grpc.Testing.CompressionType responseCompression_ = global::Grpc.Testing.CompressionType.NONE; + private global::Grpc.Testing.CompressionType responseCompression_ = 0; /// <summary> /// Compression algorithm to be used by the server for the response (stream) /// </summary> @@ -1345,10 +1345,10 @@ namespace Grpc.Testing { public override int GetHashCode() { int hash = 1; - if (ResponseType != global::Grpc.Testing.PayloadType.COMPRESSABLE) hash ^= ResponseType.GetHashCode(); + if (ResponseType != 0) hash ^= ResponseType.GetHashCode(); hash ^= responseParameters_.GetHashCode(); if (payload_ != null) hash ^= Payload.GetHashCode(); - if (ResponseCompression != global::Grpc.Testing.CompressionType.NONE) hash ^= ResponseCompression.GetHashCode(); + if (ResponseCompression != 0) hash ^= ResponseCompression.GetHashCode(); if (responseStatus_ != null) hash ^= ResponseStatus.GetHashCode(); return hash; } @@ -1358,7 +1358,7 @@ namespace Grpc.Testing { } public void WriteTo(pb::CodedOutputStream output) { - if (ResponseType != global::Grpc.Testing.PayloadType.COMPRESSABLE) { + if (ResponseType != 0) { output.WriteRawTag(8); output.WriteEnum((int) ResponseType); } @@ -1367,7 +1367,7 @@ namespace Grpc.Testing { output.WriteRawTag(26); output.WriteMessage(Payload); } - if (ResponseCompression != global::Grpc.Testing.CompressionType.NONE) { + if (ResponseCompression != 0) { output.WriteRawTag(48); output.WriteEnum((int) ResponseCompression); } @@ -1379,14 +1379,14 @@ namespace Grpc.Testing { public int CalculateSize() { int size = 0; - if (ResponseType != global::Grpc.Testing.PayloadType.COMPRESSABLE) { + if (ResponseType != 0) { size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) ResponseType); } size += responseParameters_.CalculateSize(_repeated_responseParameters_codec); if (payload_ != null) { size += 1 + pb::CodedOutputStream.ComputeMessageSize(Payload); } - if (ResponseCompression != global::Grpc.Testing.CompressionType.NONE) { + if (ResponseCompression != 0) { size += 1 + pb::CodedOutputStream.ComputeEnumSize((int) ResponseCompression); } if (responseStatus_ != null) { @@ -1399,7 +1399,7 @@ namespace Grpc.Testing { if (other == null) { return; } - if (other.ResponseType != global::Grpc.Testing.PayloadType.COMPRESSABLE) { + if (other.ResponseType != 0) { ResponseType = other.ResponseType; } responseParameters_.Add(other.responseParameters_); @@ -1409,7 +1409,7 @@ namespace Grpc.Testing { } Payload.MergeFrom(other.Payload); } - if (other.ResponseCompression != global::Grpc.Testing.CompressionType.NONE) { + if (other.ResponseCompression != 0) { ResponseCompression = other.ResponseCompression; } if (other.responseStatus_ != null) { diff --git a/src/csharp/Grpc.IntegrationTesting/Metrics.cs b/src/csharp/Grpc.IntegrationTesting/Metrics.cs index 3163949d32..8f31fbc2a9 100644 --- a/src/csharp/Grpc.IntegrationTesting/Metrics.cs +++ b/src/csharp/Grpc.IntegrationTesting/Metrics.cs @@ -34,10 +34,10 @@ namespace Grpc.Testing { "dWdlUmVzcG9uc2ViBnByb3RvMw==")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, - new pbr::GeneratedCodeInfo(null, new pbr::GeneratedCodeInfo[] { - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.GaugeResponse), global::Grpc.Testing.GaugeResponse.Parser, new[]{ "Name", "LongValue", "DoubleValue", "StringValue" }, new[]{ "Value" }, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.GaugeRequest), global::Grpc.Testing.GaugeRequest.Parser, new[]{ "Name" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.EmptyMessage), global::Grpc.Testing.EmptyMessage.Parser, null, null, null, null) + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.GaugeResponse), global::Grpc.Testing.GaugeResponse.Parser, new[]{ "Name", "LongValue", "DoubleValue", "StringValue" }, new[]{ "Value" }, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.GaugeRequest), global::Grpc.Testing.GaugeRequest.Parser, new[]{ "Name" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.EmptyMessage), global::Grpc.Testing.EmptyMessage.Parser, null, null, null, null) })); } #endregion @@ -92,7 +92,7 @@ namespace Grpc.Testing { public string Name { get { return name_; } set { - name_ = pb::Preconditions.CheckNotNull(value, "value"); + name_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } @@ -121,7 +121,7 @@ namespace Grpc.Testing { public string StringValue { get { return valueCase_ == ValueOneofCase.StringValue ? (string) value_ : ""; } set { - value_ = pb::Preconditions.CheckNotNull(value, "value"); + value_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); valueCase_ = ValueOneofCase.StringValue; } } @@ -299,7 +299,7 @@ namespace Grpc.Testing { public string Name { get { return name_; } set { - name_ = pb::Preconditions.CheckNotNull(value, "value"); + name_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); } } diff --git a/src/csharp/Grpc.IntegrationTesting/Payloads.cs b/src/csharp/Grpc.IntegrationTesting/Payloads.cs index 663f625aa7..3ad7a44f4b 100644 --- a/src/csharp/Grpc.IntegrationTesting/Payloads.cs +++ b/src/csharp/Grpc.IntegrationTesting/Payloads.cs @@ -34,11 +34,11 @@ namespace Grpc.Testing { "aW5nLkNvbXBsZXhQcm90b1BhcmFtc0gAQgkKB3BheWxvYWRiBnByb3RvMw==")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, - new pbr::GeneratedCodeInfo(null, new pbr::GeneratedCodeInfo[] { - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ByteBufferParams), global::Grpc.Testing.ByteBufferParams.Parser, new[]{ "ReqSize", "RespSize" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.SimpleProtoParams), global::Grpc.Testing.SimpleProtoParams.Parser, new[]{ "ReqSize", "RespSize" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ComplexProtoParams), global::Grpc.Testing.ComplexProtoParams.Parser, null, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.PayloadConfig), global::Grpc.Testing.PayloadConfig.Parser, new[]{ "BytebufParams", "SimpleParams", "ComplexParams" }, new[]{ "Payload" }, null, null) + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ByteBufferParams), global::Grpc.Testing.ByteBufferParams.Parser, new[]{ "ReqSize", "RespSize" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.SimpleProtoParams), global::Grpc.Testing.SimpleProtoParams.Parser, new[]{ "ReqSize", "RespSize" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ComplexProtoParams), global::Grpc.Testing.ComplexProtoParams.Parser, null, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.PayloadConfig), global::Grpc.Testing.PayloadConfig.Parser, new[]{ "BytebufParams", "SimpleParams", "ComplexParams" }, new[]{ "Payload" }, null, null) })); } #endregion diff --git a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs index 13ab5a25ab..b2f2e4d691 100644 --- a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs @@ -55,7 +55,7 @@ namespace Grpc.IntegrationTesting { var serverConfig = new ServerConfig { - ServerType = ServerType.ASYNC_SERVER + ServerType = ServerType.AsyncServer }; serverRunner = ServerRunners.CreateStarted(serverConfig); } @@ -75,7 +75,7 @@ namespace Grpc.IntegrationTesting var config = new ClientConfig { ServerTargets = { string.Format("{0}:{1}", "localhost", serverRunner.BoundPort) }, - RpcType = RpcType.UNARY, + RpcType = RpcType.Unary, LoadParams = new LoadParams { ClosedLoop = new ClosedLoopParams() }, PayloadConfig = new PayloadConfig { diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs index d7859443e0..8689d188ae 100644 --- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs +++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs @@ -77,13 +77,13 @@ namespace Grpc.IntegrationTesting } ServerServiceDefinition service = null; - if (config.ServerType == ServerType.ASYNC_SERVER) + if (config.ServerType == ServerType.AsyncServer) { GrpcPreconditions.CheckArgument(config.PayloadConfig == null, "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server."); service = BenchmarkService.BindService(new BenchmarkServiceImpl()); } - else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER) + else if (config.ServerType == ServerType.AsyncGenericServer) { var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize); service = GenericService.BindHandler(genericService.StreamingCall); diff --git a/src/csharp/Grpc.IntegrationTesting/Services.cs b/src/csharp/Grpc.IntegrationTesting/Services.cs index a8475c1817..e10b45c9a2 100644 --- a/src/csharp/Grpc.IntegrationTesting/Services.cs +++ b/src/csharp/Grpc.IntegrationTesting/Services.cs @@ -39,7 +39,7 @@ namespace Grpc.Testing { "YgZwcm90bzM=")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Grpc.Testing.MessagesReflection.Descriptor, global::Grpc.Testing.ControlReflection.Descriptor, }, - new pbr::GeneratedCodeInfo(null, null)); + new pbr::GeneratedClrTypeInfo(null, null)); } #endregion diff --git a/src/csharp/Grpc.IntegrationTesting/Stats.cs b/src/csharp/Grpc.IntegrationTesting/Stats.cs index 39c00ea88c..304d676113 100644 --- a/src/csharp/Grpc.IntegrationTesting/Stats.cs +++ b/src/csharp/Grpc.IntegrationTesting/Stats.cs @@ -35,11 +35,11 @@ namespace Grpc.Testing { "ZXIYAyABKAESEwoLdGltZV9zeXN0ZW0YBCABKAFiBnByb3RvMw==")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { }, - new pbr::GeneratedCodeInfo(null, new pbr::GeneratedCodeInfo[] { - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ServerStats), global::Grpc.Testing.ServerStats.Parser, new[]{ "TimeElapsed", "TimeUser", "TimeSystem" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.HistogramParams), global::Grpc.Testing.HistogramParams.Parser, new[]{ "Resolution", "MaxPossible" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.HistogramData), global::Grpc.Testing.HistogramData.Parser, new[]{ "Bucket", "MinSeen", "MaxSeen", "Sum", "SumOfSquares", "Count" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClientStats), global::Grpc.Testing.ClientStats.Parser, new[]{ "Latencies", "TimeElapsed", "TimeUser", "TimeSystem" }, null, null, null) + new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] { + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ServerStats), global::Grpc.Testing.ServerStats.Parser, new[]{ "TimeElapsed", "TimeUser", "TimeSystem" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.HistogramParams), global::Grpc.Testing.HistogramParams.Parser, new[]{ "Resolution", "MaxPossible" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.HistogramData), global::Grpc.Testing.HistogramData.Parser, new[]{ "Bucket", "MinSeen", "MaxSeen", "Sum", "SumOfSquares", "Count" }, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::Grpc.Testing.ClientStats), global::Grpc.Testing.ClientStats.Parser, new[]{ "Latencies", "TimeElapsed", "TimeUser", "TimeSystem" }, null, null, null) })); } #endregion diff --git a/src/csharp/Grpc.IntegrationTesting/Test.cs b/src/csharp/Grpc.IntegrationTesting/Test.cs index 363f6444ec..9258dc185d 100644 --- a/src/csharp/Grpc.IntegrationTesting/Test.cs +++ b/src/csharp/Grpc.IntegrationTesting/Test.cs @@ -46,7 +46,7 @@ namespace Grpc.Testing { "cnBjLnRlc3RpbmcuUmVjb25uZWN0SW5mb2IGcHJvdG8z")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Grpc.Testing.EmptyReflection.Descriptor, global::Grpc.Testing.MessagesReflection.Descriptor, }, - new pbr::GeneratedCodeInfo(null, null)); + new pbr::GeneratedClrTypeInfo(null, null)); } #endregion diff --git a/src/csharp/Grpc.IntegrationTesting/WorkerServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/WorkerServiceImpl.cs index 80dad9fdd9..c9eca73452 100644 --- a/src/csharp/Grpc.IntegrationTesting/WorkerServiceImpl.cs +++ b/src/csharp/Grpc.IntegrationTesting/WorkerServiceImpl.cs @@ -64,7 +64,7 @@ namespace Grpc.Testing { Stats = runner.GetStats(false), Port = runner.BoundPort, - Cores = 0, // TODO: set number of cores + Cores = Environment.ProcessorCount, }); while (await requestStream.MoveNext()) diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config index 3fef67dca4..3161c5b755 100644 --- a/src/csharp/Grpc.IntegrationTesting/packages.config +++ b/src/csharp/Grpc.IntegrationTesting/packages.config @@ -4,7 +4,7 @@ <package id="CommandLineParser" version="1.9.71" targetFramework="net45" /> <package id="Google.Apis.Auth" version="1.11.1" targetFramework="net45" /> <package id="Google.Apis.Core" version="1.11.1" targetFramework="net45" /> - <package id="Google.Protobuf" version="3.0.0-beta2" targetFramework="net45" /> + <package id="Google.Protobuf" version="3.0.0-beta3" targetFramework="net45" /> <package id="Ix-Async" version="1.2.5" targetFramework="net45" /> <package id="Moq" version="4.2.1510.2205" targetFramework="net45" /> <package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" /> diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat index 28e4262121..1cc63da970 100644 --- a/src/csharp/build_packages.bat +++ b/src/csharp/build_packages.bat @@ -31,7 +31,7 @@ @rem Current package versions set VERSION=0.15.0-dev -set PROTOBUF_VERSION=3.0.0-beta2 +set PROTOBUF_VERSION=3.0.0-beta3 @rem Packages that depend on prerelease packages (like Google.Protobuf) need to have prerelease suffix as well. set VERSION_WITH_BETA=%VERSION%-beta diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 5b8ff9b819..4beef9ded8 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -806,11 +806,14 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_set_credentials( /* Server */ GPR_EXPORT grpc_server *GPR_CALLTYPE -grpcsharp_server_create(grpc_completion_queue *cq, - const grpc_channel_args *args) { - grpc_server *server = grpc_server_create(args, NULL); +grpcsharp_server_create(const grpc_channel_args *args) { + return grpc_server_create(args, NULL); +} + +GPR_EXPORT void GPR_CALLTYPE +grpcsharp_server_register_completion_queue(grpc_server *server, + grpc_completion_queue *cq) { grpc_server_register_completion_queue(server, cq, NULL); - return server; } GPR_EXPORT int32_t GPR_CALLTYPE |