diff options
-rw-r--r-- | src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj | 1 | ||||
-rw-r--r-- | src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs | 7 | ||||
-rw-r--r-- | src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs | 243 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Grpc.Core.csproj | 1 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/GrpcEnvironment.cs | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 63 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 27 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 42 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/INativeCall.cs | 79 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Properties/AssemblyInfo.cs | 7 |
11 files changed, 421 insertions, 65 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index ad4e94a695..b571fe9025 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -65,6 +65,7 @@ </Compile> <Compile Include="ClientBaseTest.cs" /> <Compile Include="ShutdownTest.cs" /> + <Compile Include="Internal\AsyncCallTest.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="ClientServerTest.cs" /> <Compile Include="ServerTest.cs" /> diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index 4fdfab5a99..78295cf6d4 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -53,7 +53,7 @@ namespace Grpc.Core.Tests { var env1 = GrpcEnvironment.AddRef(); var env2 = GrpcEnvironment.AddRef(); - Assert.IsTrue(object.ReferenceEquals(env1, env2)); + Assert.AreSame(env1, env2); GrpcEnvironment.Release(); GrpcEnvironment.Release(); } @@ -61,18 +61,21 @@ namespace Grpc.Core.Tests [Test] public void InitializeAfterShutdown() { + Assert.AreEqual(0, GrpcEnvironment.GetRefCount()); + var env1 = GrpcEnvironment.AddRef(); GrpcEnvironment.Release(); var env2 = GrpcEnvironment.AddRef(); GrpcEnvironment.Release(); - Assert.IsFalse(object.ReferenceEquals(env1, env2)); + Assert.AreNotSame(env1, env2); } [Test] public void ReleaseWithoutAddRef() { + Assert.AreEqual(0, GrpcEnvironment.GetRefCount()); Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release()); } diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs new file mode 100644 index 0000000000..141af7760c --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -0,0 +1,243 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Grpc.Core.Internal; +using NUnit.Framework; +using System.Threading.Tasks; + +namespace Grpc.Core.Internal.Tests +{ + public class AsyncCallTest + { + Channel channel; + FakeNativeCall fakeCall; + AsyncCall<string, string> asyncCall; + + [SetUp] + public void Init() + { + channel = new Channel("localhost", Credentials.Insecure); + + fakeCall = new FakeNativeCall(); + + var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions()); + asyncCall = new AsyncCall<string, string>(callDetails, fakeCall); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + } + + [Test] + public void AsyncUnary_CompletionSuccess() + { + var resultTask = asyncCall.UnaryCallAsync("abc"); + fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), new byte[] { 1, 2, 3 }); + Assert.IsTrue(resultTask.IsCompleted); + Assert.IsTrue(fakeCall.IsDisposed); + Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); + } + + [Test] + public void AsyncUnary_CompletionFailure() + { + var resultTask = asyncCall.UnaryCallAsync("abc"); + fakeCall.UnaryResponseClientHandler(false, new ClientSideStatus(), null); + + Assert.IsTrue(resultTask.IsCompleted); + Assert.IsTrue(fakeCall.IsDisposed); + + Assert.AreEqual(StatusCode.Internal, asyncCall.GetStatus().StatusCode); + Assert.IsNull(asyncCall.GetTrailers()); + var ex = Assert.Throws<RpcException>(() => resultTask.GetAwaiter().GetResult()); + Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); + } + + + //[Test] + //public void Duplex_ReceiveEarlyClose() + //{ + // asyncCall.StartDuplexStreamingCall(); + + // fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(new Status(StatusCode.DeadlineExceeded, ""), null)); + + // // TODO: start read... + // Assert.IsTrue(fakeCall.IsDisposed); + //} + + //[Test] + //public void Duplex_ReceiveEarlyCloseWithRead() + //{ + // asyncCall.StartDuplexStreamingCall(); + + // fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(new Status(StatusCode.DeadlineExceeded, ""), null)); + + // var taskSource = new AsyncCompletionTaskSource<string>(); + // asyncCall.StartReadMessage(taskSource.CompletionDelegate); + + // fakeCall.ReceivedMessageHandler(true, new byte[] { 1 } ); + + // // TODO: start read... + // Assert.IsTrue(fakeCall.IsDisposed); + //} + + + internal class FakeNativeCall : INativeCall + { + + public UnaryResponseClientHandler UnaryResponseClientHandler + { + get; + set; + } + + public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler + { + get; + set; + } + + public ReceivedMessageHandler ReceivedMessageHandler + { + get; + set; + } + + public SendCompletionHandler SendCompletionHandler + { + get; + set; + } + + public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler + { + get; + set; + } + + public bool IsCancelled + { + get; + set; + } + + public bool IsDisposed + { + get; + set; + } + + public void Cancel() + { + IsCancelled = true; + } + + public void CancelWithStatus(Status status) + { + IsCancelled = true; + } + + public string GetPeer() + { + return "PEER"; + } + + public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + UnaryResponseClientHandler = callback; + } + + public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + throw new NotImplementedException(); + } + + public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) + { + UnaryResponseClientHandler = callback; + } + + public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartReceiveMessage(ReceivedMessageHandler callback) + { + ReceivedMessageHandler = callback; + } + + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) + { + SendCompletionHandler = callback; + } + + public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + { + SendCompletionHandler = callback; + } + + public void StartSendCloseFromClient(SendCompletionHandler callback) + { + SendCompletionHandler = callback; + } + + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + { + SendCompletionHandler = callback; + } + + public void StartServerSide(ReceivedCloseOnServerHandler callback) + { + ReceivedCloseOnServerHandler = callback; + } + + public void Dispose() + { + IsDisposed = true; + } + } + + } + + +}
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 055aff1444..ad2af17bc7 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -49,6 +49,7 @@ <Compile Include="AsyncDuplexStreamingCall.cs" /> <Compile Include="AsyncServerStreamingCall.cs" /> <Compile Include="IClientStreamWriter.cs" /> + <Compile Include="Internal\INativeCall.cs" /> <Compile Include="IServerStreamWriter.cs" /> <Compile Include="IAsyncStreamWriter.cs" /> <Compile Include="IAsyncStreamReader.cs" /> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 0a44eead74..b64228558e 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -102,6 +102,14 @@ namespace Grpc.Core } } + internal static int GetRefCount() + { + lock (staticLock) + { + return refCount; + } + } + /// <summary> /// Gets application-wide logger used by gRPC. /// </summary> diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index bb9ba5b8dd..30d60077f0 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -51,6 +51,7 @@ namespace Grpc.Core.Internal static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); readonly CallInvocationDetails<TRequest, TResponse> details; + readonly INativeCall injectedNativeCall; // for testing // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; @@ -61,12 +62,21 @@ namespace Grpc.Core.Internal bool readObserverCompleted; // True if readObserver has already been completed. public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) - : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) + : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment) { this.details = callDetails.WithOptions(callDetails.Options.Normalize()); this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. } + /// <summary> + /// This constructor should only be used for testing. + /// </summary> + public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) + : this(callDetails) + { + this.injectedNativeCall = injectedNativeCall; + } + // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but // it is reusing fair amount of code in this class, so we are leaving it here. /// <summary> @@ -100,7 +110,7 @@ namespace Grpc.Core.Internal bool success = (ev.success != 0); try { - HandleUnaryResponse(success, ctx); + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage()); } catch (Exception e) { @@ -125,7 +135,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(details.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); halfcloseRequested = true; readingDone = true; @@ -152,7 +162,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(details.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); readingDone = true; @@ -176,7 +186,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(details.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); halfcloseRequested = true; halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. @@ -201,7 +211,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(details.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { @@ -318,18 +328,27 @@ namespace Grpc.Core.Internal private void Initialize(CompletionQueueSafeHandle cq) { + var call = CreateNativeCall(cq); + details.Channel.AddCallReference(this); + InitializeInternal(call); + RegisterCancellationCallback(); + } + + private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) + { + if (injectedNativeCall != null) + { + return injectedNativeCall; // allows injecting a mock INativeCall in tests. + } + var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance; - var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, + return details.Channel.Handle.CreateCall(environment.CompletionRegistry, parentCall, ContextPropagationToken.DefaultMask, cq, details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value)); - - details.Channel.AddCallReference(this); - - InitializeInternal(call); - RegisterCancellationCallback(); } + // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called. private void RegisterCancellationCallback() { @@ -352,14 +371,12 @@ namespace Grpc.Core.Internal /// <summary> /// Handler for unary response completion. /// </summary> - private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx) + private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage) { - var fullStatus = ctx.GetReceivedStatusOnClient(); - lock (myLock) { finished = true; - finishedStatus = fullStatus; + finishedStatus = receivedStatus; halfclosed = true; @@ -368,11 +385,13 @@ namespace Grpc.Core.Internal if (!success) { - unaryResponseTcs.SetException(new RpcException(new Status(StatusCode.Internal, "Internal error occured."))); + var internalError = new Status(StatusCode.Internal, "Internal error occured."); + finishedStatus = new ClientSideStatus(internalError, null); + unaryResponseTcs.SetException(new RpcException(internalError)); return; } - var status = fullStatus.Status; + var status = receivedStatus.Status; if (status.StatusCode != StatusCode.OK) { @@ -382,7 +401,7 @@ namespace Grpc.Core.Internal // TODO: handle deserialization error TResponse msg; - TryDeserialize(ctx.GetReceivedMessage(), out msg); + TryDeserialize(receivedMessage, out msg); unaryResponseTcs.SetResult(msg); } @@ -390,15 +409,13 @@ namespace Grpc.Core.Internal /// <summary> /// Handles receive status completion for calls with streaming response. /// </summary> - private void HandleFinished(bool success, BatchContextSafeHandle ctx) + private void HandleFinished(bool success, ClientSideStatus receivedStatus) { - var fullStatus = ctx.GetReceivedStatusOnClient(); - AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null; lock (myLock) { finished = true; - finishedStatus = fullStatus; + finishedStatus = receivedStatus; origReadCompletionDelegate = readCompletionDelegate; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 1808294f43..7744dbec00 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -54,9 +54,10 @@ namespace Grpc.Core.Internal readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; + protected readonly GrpcEnvironment environment; protected readonly object myLock = new object(); - protected CallSafeHandle call; + protected INativeCall call; protected bool disposed; protected bool started; @@ -74,10 +75,11 @@ namespace Grpc.Core.Internal protected bool initialMetadataSent; protected long streamingWritesCounter; - public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment) { this.serializer = Preconditions.CheckNotNull(serializer); this.deserializer = Preconditions.CheckNotNull(deserializer); + this.environment = Preconditions.CheckNotNull(environment); } /// <summary> @@ -114,7 +116,7 @@ namespace Grpc.Core.Internal } } - protected void InitializeInternal(CallSafeHandle call) + protected void InitializeInternal(INativeCall call) { lock (myLock) { @@ -177,7 +179,7 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null); + bool noMoreSendCompletions = halfclosed || ((cancelRequested || finished) && sendCompletionDelegate == null); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); @@ -209,14 +211,15 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!disposed); Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); + Preconditions.CheckState(!finished, "Already finished."); Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } protected virtual void CheckReadingAllowed() { Preconditions.CheckState(started); - Preconditions.CheckState(!disposed); Preconditions.CheckState(!errorOccured); + Preconditions.CheckState(!disposed); Preconditions.CheckState(!readingDone, "Stream has already been closed."); Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); @@ -280,7 +283,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handles send completion. /// </summary> - protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx) + protected void HandleSendFinished(bool success) { AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) @@ -304,7 +307,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handles halfclose completion. /// </summary> - protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx) + protected void HandleHalfclosed(bool success) { AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) @@ -329,15 +332,13 @@ namespace Grpc.Core.Internal /// <summary> /// Handles streaming read completion. /// </summary> - protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx) + protected void HandleReadFinished(bool success, byte[] receivedMessage) { - var payload = ctx.GetReceivedMessage(); - AsyncCompletionDelegate<TRead> origCompletionDelegate = null; lock (myLock) { origCompletionDelegate = readCompletionDelegate; - if (payload != null) + if (receivedMessage != null) { readCompletionDelegate = null; } @@ -354,11 +355,11 @@ namespace Grpc.Core.Internal // TODO: handle the case when error occured... - if (payload != null) + if (receivedMessage != null) { // TODO: handle deserialization error TRead msg; - TryDeserialize(payload, out msg); + TryDeserialize(receivedMessage, out msg); FireCompletion(origCompletionDelegate, msg, null); } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 6278c0191e..5c47251030 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -49,12 +49,10 @@ namespace Grpc.Core.Internal { readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - readonly GrpcEnvironment environment; readonly Server server; - public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer) + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment) { - this.environment = Preconditions.CheckNotNull(environment); this.server = Preconditions.CheckNotNull(server); } @@ -185,10 +183,8 @@ namespace Grpc.Core.Internal /// <summary> /// Handles the server side close completion. /// </summary> - private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx) + private void HandleFinishedServerside(bool success, bool cancelled) { - bool cancelled = ctx.GetReceivedCloseOnServerCancelled(); - lock (myLock) { finished = true; diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 3cb01e29bd..e1466da65b 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -40,7 +40,7 @@ namespace Grpc.Core.Internal /// <summary> /// grpc_call from <grpc/grpc.h> /// </summary> - internal class CallSafeHandle : SafeHandleZeroIsInvalid + internal class CallSafeHandle : SafeHandleZeroIsInvalid, INativeCall { public static readonly CallSafeHandle NullInstance = new CallSafeHandle(); @@ -109,10 +109,10 @@ namespace Grpc.Core.Internal this.completionRegistry = completionRegistry; } - public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage())); grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } @@ -123,66 +123,66 @@ namespace Grpc.Core.Internal .CheckOk(); } - public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage())); grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); } - public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } - public void StartSendCloseFromClient(BatchCompletionDelegate callback) + public void StartSendCloseFromClient(SendCompletionHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); } - public void StartReceiveMessage(BatchCompletionDelegate callback) + public void StartReceiveMessage(ReceivedMessageHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); grpcsharp_call_recv_message(this, ctx).CheckOk(); } - public void StartServerSide(BatchCompletionDelegate callback) + public void StartServerSide(ReceivedCloseOnServerHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); grpcsharp_call_start_serverside(this, ctx).CheckOk(); } - public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs new file mode 100644 index 0000000000..42028e458c --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -0,0 +1,79 @@ +#region Copyright notice and license +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +namespace Grpc.Core.Internal +{ + internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage); + + internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus); + + internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage); + + internal delegate void SendCompletionHandler(bool success); + + internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled); + + /// <summary> + /// Abstraction of a native call object. + /// </summary> + internal interface INativeCall : IDisposable + { + void Cancel(); + + void CancelWithStatus(Grpc.Core.Status status); + + string GetPeer(); + + void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags); + + void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags); + + void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray); + + void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags); + + void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray); + + void StartReceiveMessage(ReceivedMessageHandler callback); + + void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray); + + void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata); + + void StartSendCloseFromClient(SendCompletionHandler callback); + + void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + + void StartServerSide(ReceivedCloseOnServerHandler callback); + } +} diff --git a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs index 29db85d7aa..caca080b7f 100644 --- a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs +++ b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs @@ -16,6 +16,13 @@ using System.Runtime.CompilerServices; "0442bb8e12768722de0b0cb1b15e955b32a11352740ee59f2c94c48edc8e177d1052536b8ac651bce11ce5da3a" + "27fc95aff3dc604a6971417453f9483c7b5e836756d5b271bf8f2403fe186e31956148c03d804487cf642f8cc0" + "71394ee9672dfe5b55ea0f95dfd5a7f77d22c962ccf51320d3")] + +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2,PublicKey=" + + "0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6" + + "c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdc" + + "f9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff6" + + "2abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")] #else [assembly: InternalsVisibleTo("Grpc.Core.Tests")] +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] #endif
\ No newline at end of file |