diff options
Diffstat (limited to 'src/csharp')
-rw-r--r-- | src/csharp/Grpc.Core/Grpc.Core.csproj | 7 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCall.cs | 577 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallBase.cs | 407 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCallServer.cs | 125 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/AsyncCompletion.cs | 95 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 1 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs | 9 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs | 16 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/OperationFailedException.cs | 48 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/ServerCallHandler.cs | 34 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Status.cs | 6 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Utils/Preconditions.cs | 113 |
12 files changed, 949 insertions, 489 deletions
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 93d5430591..78b6cdde59 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -51,7 +51,6 @@ <Compile Include="Internal\SafeHandleZeroIsInvalid.cs" /> <Compile Include="Internal\Timespec.cs" /> <Compile Include="Internal\GrpcThreadPool.cs" /> - <Compile Include="Internal\AsyncCall.cs" /> <Compile Include="Internal\ServerSafeHandle.cs" /> <Compile Include="Method.cs" /> <Compile Include="ServerCalls.cs" /> @@ -69,6 +68,12 @@ <Compile Include="Credentials.cs" /> <Compile Include="Internal\ChannelArgsSafeHandle.cs" /> <Compile Include="ChannelArgs.cs" /> + <Compile Include="Internal\AsyncCompletion.cs" /> + <Compile Include="Internal\AsyncCallBase.cs" /> + <Compile Include="Internal\AsyncCallServer.cs" /> + <Compile Include="OperationFailedException.cs" /> + <Compile Include="Internal\AsyncCall.cs" /> + <Compile Include="Utils\Preconditions.cs" /> </ItemGroup> <Choose> <!-- Under older versions of Monodevelop, Choose is not supported and is just diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 6f37b059f7..5ae036298b 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -43,84 +43,47 @@ using Grpc.Core.Utils; namespace Grpc.Core.Internal { /// <summary> - /// Handles native call lifecycle and provides convenience methods. + /// Handles client side native call lifecycle. /// </summary> - internal class AsyncCall<TWrite, TRead> + internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse> { - readonly Func<TWrite, byte[]> serializer; - readonly Func<byte[], TRead> deserializer; - readonly CompletionCallbackDelegate unaryResponseHandler; readonly CompletionCallbackDelegate finishedHandler; - readonly CompletionCallbackDelegate writeFinishedHandler; - readonly CompletionCallbackDelegate readFinishedHandler; - readonly CompletionCallbackDelegate halfclosedHandler; - readonly CompletionCallbackDelegate finishedServersideHandler; - - object myLock = new object(); - GCHandle gchandle; - CallSafeHandle call; - bool disposed; - - bool server; - - bool started; - bool errorOccured; - bool cancelRequested; - bool readingDone; - bool halfcloseRequested; - bool halfclosed; - bool finished; - - // Completion of a pending write if not null. - TaskCompletionSource<object> writeTcs; - - // Completion of a pending read if not null. - TaskCompletionSource<TRead> readTcs; - - // Completion of a pending halfclose if not null. - TaskCompletionSource<object> halfcloseTcs; // Completion of a pending unary response if not null. - TaskCompletionSource<TRead> unaryResponseTcs; + TaskCompletionSource<TResponse> unaryResponseTcs; - // Set after status is received on client. Only used for server streaming and duplex streaming calls. + // Set after status is received. Only used for streaming response calls. Nullable<Status> finishedStatus; - TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); - // For streaming, the reads will be delivered to this observer. - IObserver<TRead> readObserver; + bool readObserverCompleted; // True if readObserver has already been completed. - public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) + public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer) { - this.serializer = serializer; - this.deserializer = deserializer; - this.unaryResponseHandler = HandleUnaryResponse; - this.finishedHandler = HandleFinished; - this.writeFinishedHandler = HandleWriteFinished; - this.readFinishedHandler = HandleReadFinished; - this.halfclosedHandler = HandleHalfclosed; - this.finishedServersideHandler = HandleFinishedServerside; + this.unaryResponseHandler = CreateBatchCompletionCallback(HandleUnaryResponse); + this.finishedHandler = CreateBatchCompletionCallback(HandleFinished); } public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) { - InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false); + var call = CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture); + InitializeInternal(call); } - public void InitializeServer(CallSafeHandle call) - { - InitializeInternal(call, true); - } - - public TRead UnaryCall(Channel channel, String methodName, TWrite msg) + // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but + // it is reusing fair amount of code in this class, so we are leaving it here. + // TODO: for other calls, you need to call Initialize, this methods calls initialize + // on its own, so there's a usage inconsistency. + /// <summary> + /// Blocking unary request - unary response call. + /// </summary> + public TResponse UnaryCall(Channel channel, String methodName, TRequest msg) { using(CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create()) { - // TODO: handle serialization error... - byte[] payload = serializer(msg); + byte[] payload = UnsafeSerialize(msg); - unaryResponseTcs = new TaskCompletionSource<TRead>(); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); lock (myLock) { @@ -143,508 +106,200 @@ namespace Grpc.Core.Internal } } - public Task<TRead> UnaryCallAsync(TWrite msg) + /// <summary> + /// Starts a unary request - unary response call. + /// </summary> + public Task<TResponse> UnaryCallAsync(TRequest msg) { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; halfcloseRequested = true; readingDone = true; - // TODO: handle serialization error... - byte[] payload = serializer(msg); + byte[] payload = UnsafeSerialize(msg); - unaryResponseTcs = new TaskCompletionSource<TRead>(); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); call.StartUnary(payload, unaryResponseHandler); return unaryResponseTcs.Task; } } - public Task<TRead> ClientStreamingCallAsync() + /// <summary> + /// Starts a streamed request - unary response call. + /// Use StartSendMessage and StartSendCloseFromClient to stream requests. + /// </summary> + public Task<TResponse> ClientStreamingCallAsync() { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; readingDone = true; - unaryResponseTcs = new TaskCompletionSource<TRead>(); + unaryResponseTcs = new TaskCompletionSource<TResponse>(); call.StartClientStreaming(unaryResponseHandler); return unaryResponseTcs.Task; } } - public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver) + /// <summary> + /// Starts a unary request - streamed response call. + /// </summary> + public void StartServerStreamingCall(TRequest msg, IObserver<TResponse> readObserver) { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; halfcloseRequested = true; halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. this.readObserver = readObserver; - // TODO: handle serialization error... - byte[] payload = serializer(msg); + byte[] payload = UnsafeSerialize(msg); call.StartServerStreaming(payload, finishedHandler); - ReceiveMessageAsync(); + StartReceiveMessage(); } } - public void StartDuplexStreamingCall(IObserver<TRead> readObserver) + /// <summary> + /// Starts a streaming request - streaming response call. + /// Use StartSendMessage and StartSendCloseFromClient to stream requests. + /// </summary> + public void StartDuplexStreamingCall(IObserver<TResponse> readObserver) { lock (myLock) { + Preconditions.CheckNotNull(call); + started = true; this.readObserver = readObserver; call.StartDuplexStreaming(finishedHandler); - ReceiveMessageAsync(); + StartReceiveMessage(); } } - public Task ServerSideUnaryRequestCallAsync() - { - lock (myLock) - { - started = true; - call.StartServerSide(finishedServersideHandler); - return finishedServersideTcs.Task; - } - } - - public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver) - { - lock (myLock) - { - started = true; - call.StartServerSide(finishedServersideHandler); - - if (this.readObserver != null) - { - throw new InvalidOperationException("Already registered an observer."); - } - this.readObserver = readObserver; - ReceiveMessageAsync(); - - return finishedServersideTcs.Task; - } - } - - public Task SendMessageAsync(TWrite msg) + /// <summary> + /// Sends a streaming request. Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendMessage(TRequest msg, AsyncCompletionDelegate completionDelegate) { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (halfcloseRequested) - { - throw new InvalidOperationException("Already halfclosed."); - } - - if (writeTcs != null) - { - throw new InvalidOperationException("Only one write can be pending at a time"); - } - - // TODO: wrap serialization... - byte[] payload = serializer(msg); - - call.StartSendMessage(payload, writeFinishedHandler); - writeTcs = new TaskCompletionSource<object>(); - return writeTcs.Task; - } + StartSendMessageInternal(msg, completionDelegate); } - public Task SendCloseFromClientAsync() + /// <summary> + /// Sends halfclose, indicating client is done with streaming requests. + /// Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendCloseFromClient(AsyncCompletionDelegate completionDelegate) { lock (myLock) { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (halfcloseRequested) - { - throw new InvalidOperationException("Already halfclosed."); - } + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + CheckSendingAllowed(); call.StartSendCloseFromClient(halfclosedHandler); halfcloseRequested = true; - halfcloseTcs = new TaskCompletionSource<object>(); - return halfcloseTcs.Task; - } - } - - public Task SendStatusFromServerAsync(Status status) - { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (halfcloseRequested) - { - throw new InvalidOperationException("Already halfclosed."); - } - - call.StartSendStatusFromServer(status, halfclosedHandler); - halfcloseRequested = true; - halfcloseTcs = new TaskCompletionSource<object>(); - return halfcloseTcs.Task; + sendCompletionDelegate = completionDelegate; } } - public Task<TRead> ReceiveMessageAsync() + /// <summary> + /// On client-side, we only fire readObserver.OnCompleted once all messages have been read + /// and status has been received. + /// </summary> + protected override void CompleteReadObserver() { - lock (myLock) + if (readingDone && finishedStatus.HasValue) { - CheckNotDisposed(); - CheckStarted(); - CheckNoError(); - - if (readingDone) - { - throw new InvalidOperationException("Already read the last message."); - } - - if (readTcs != null) + bool shouldComplete; + lock (myLock) { - throw new InvalidOperationException("Only one read can be pending at a time"); + shouldComplete = !readObserverCompleted; + readObserverCompleted = true; } - call.StartReceiveMessage(readFinishedHandler); - - readTcs = new TaskCompletionSource<TRead>(); - return readTcs.Task; - } - } - - public void Cancel() - { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - cancelRequested = true; - } - // grpc_call_cancel is threadsafe - call.Cancel(); - } - - public void CancelWithStatus(Status status) - { - lock (myLock) - { - CheckNotDisposed(); - CheckStarted(); - cancelRequested = true; - } - // grpc_call_cancel_with_status is threadsafe - call.CancelWithStatus(status); - } - - private void InitializeInternal(CallSafeHandle call, bool server) - { - lock (myLock) - { - // Make sure this object and the delegated held by it will not be garbage collected - // before we release this handle. - gchandle = GCHandle.Alloc(this); - this.call = call; - this.server = server; - } - } - - private void CheckStarted() - { - if (!started) - { - throw new InvalidOperationException("Call not started"); - } - } - - private void CheckNotDisposed() - { - if (disposed) - { - throw new InvalidOperationException("Call has already been disposed."); - } - } - - private void CheckNoError() - { - if (errorOccured) - { - throw new InvalidOperationException("Error occured when processing call."); - } - } - - private bool ReleaseResourcesIfPossible() - { - if (!disposed && call != null) - { - if (halfclosed && readingDone && finished) + if (shouldComplete) { - ReleaseResources(); - return true; + var status = finishedStatus.Value; + if (status.StatusCode != StatusCode.OK) + { + FireReadObserverOnError(new RpcException(status)); + } + else + { + FireReadObserverOnCompleted(); + } } } - return false; - } - - private void ReleaseResources() - { - if (call != null) { - call.Dispose(); - } - gchandle.Free(); - disposed = true; - } - - private void CompleteStreamObserver(Status status) - { - if (status.StatusCode != StatusCode.OK) - { - // TODO: wrap to handle exceptions; - readObserver.OnError(new RpcException(status)); - } else { - // TODO: wrap to handle exceptions; - readObserver.OnCompleted(); - } } /// <summary> /// Handler for unary response completion. /// </summary> - private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr) + private void HandleUnaryResponse(bool wasError, BatchContextSafeHandleNotOwned ctx) { - try + lock(myLock) { - TaskCompletionSource<TRead> tcs; - lock(myLock) - { - finished = true; - halfclosed = true; - tcs = unaryResponseTcs; - - ReleaseResourcesIfPossible(); - } - - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + finished = true; + halfclosed = true; - if (error != GRPCOpError.GRPC_OP_OK) - { - tcs.SetException(new RpcException( - new Status(StatusCode.Internal, "Internal error occured.") - )); - return; - } - - var status = ctx.GetReceivedStatus(); - if (status.StatusCode != StatusCode.OK) - { - tcs.SetException(new RpcException(status)); - return; - } - - // TODO: handle deserialize error... - var msg = deserializer(ctx.GetReceivedMessage()); - tcs.SetResult(msg); - } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); + ReleaseResourcesIfPossible(); } - } - - private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - TaskCompletionSource<object> oldTcs = null; - lock (myLock) - { - oldTcs = writeTcs; - writeTcs = null; - } - - if (errorOccured) - { - // TODO: use the right type of exception... - oldTcs.SetException(new Exception("Write failed")); - } - else - { - // TODO: where does the continuation run? - oldTcs.SetResult(null); - } - } - catch(Exception e) + if (wasError) { - Console.WriteLine("Caught exception in a native handler: " + e); + unaryResponseTcs.SetException(new RpcException( + new Status(StatusCode.Internal, "Internal error occured.") + )); + return; } - } - - private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - lock (myLock) - { - halfclosed = true; - ReleaseResourcesIfPossible(); - } - - if (error != GRPCOpError.GRPC_OP_OK) - { - halfcloseTcs.SetException(new Exception("Halfclose failed")); - - } - else - { - halfcloseTcs.SetResult(null); - } - } - catch(Exception e) + var status = ctx.GetReceivedStatus(); + if (status.StatusCode != StatusCode.OK) { - Console.WriteLine("Caught exception in a native handler: " + e); + unaryResponseTcs.SetException(new RpcException(status)); + return; } - } - - private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) - { - try - { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - var payload = ctx.GetReceivedMessage(); - - TaskCompletionSource<TRead> oldTcs = null; - IObserver<TRead> observer = null; - - Nullable<Status> status = null; - - lock (myLock) - { - oldTcs = readTcs; - readTcs = null; - if (payload == null) - { - readingDone = true; - } - observer = readObserver; - status = finishedStatus; - - ReleaseResourcesIfPossible(); - } - - // TODO: wrap deserialization... - TRead msg = payload != null ? deserializer(payload) : default(TRead); - oldTcs.SetResult(msg); + // TODO: handle deserialization error + TResponse msg; + TryDeserialize(ctx.GetReceivedMessage(), out msg); - // TODO: make sure we deliver reads in the right order. - - if (observer != null) - { - if (payload != null) - { - // TODO: wrap to handle exceptions - observer.OnNext(msg); - - // start a new read - ReceiveMessageAsync(); - } - else - { - if (!server) - { - if (status.HasValue) - { - CompleteStreamObserver(status.Value); - } - } - else - { - // TODO: wrap to handle exceptions.. - observer.OnCompleted(); - } - // TODO: completeStreamObserver serverside... - } - } - } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } + unaryResponseTcs.SetResult(msg); } - private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) + /// <summary> + /// Handles receive status completion for calls with streaming response. + /// </summary> + private void HandleFinished(bool wasError, BatchContextSafeHandleNotOwned ctx) { - try - { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - var status = ctx.GetReceivedStatus(); - - bool wasReadingDone; - - lock (myLock) - { - finished = true; - finishedStatus = status; - - wasReadingDone = readingDone; - - ReleaseResourcesIfPossible(); - } - - if (wasReadingDone) { - CompleteStreamObserver(status); - } - - } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } - } + var status = ctx.GetReceivedStatus(); - private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) - { - try + lock (myLock) { - var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); - - lock(myLock) - { - finished = true; - - // TODO: because of the way server calls are implemented, we need to set - // reading done to true here. Should be fixed in the future. - readingDone = true; - - ReleaseResourcesIfPossible(); - } - // TODO: handle error ... - - finishedServersideTcs.SetResult(null); + finished = true; + finishedStatus = status; + ReleaseResourcesIfPossible(); } - catch(Exception e) - { - Console.WriteLine("Caught exception in a native handler: " + e); - } + + CompleteReadObserver(); } } }
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs new file mode 100644 index 0000000000..44d66b394c --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -0,0 +1,407 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Base for handling both client side and server side calls. + /// Handles native call lifecycle and provides convenience methods. + /// </summary> + internal abstract class AsyncCallBase<TWrite, TRead> + { + readonly Func<TWrite, byte[]> serializer; + readonly Func<byte[], TRead> deserializer; + + protected readonly CompletionCallbackDelegate sendFinishedHandler; + protected readonly CompletionCallbackDelegate readFinishedHandler; + protected readonly CompletionCallbackDelegate halfclosedHandler; + + protected readonly object myLock = new object(); + + protected GCHandle gchandle; + protected CallSafeHandle call; + protected bool disposed; + + protected bool started; + protected bool errorOccured; + protected bool cancelRequested; + + protected AsyncCompletionDelegate sendCompletionDelegate; // Completion of a pending send or sendclose if not null. + protected bool readPending; // True if there is a read in progress. + protected bool readingDone; + protected bool halfcloseRequested; + protected bool halfclosed; + protected bool finished; // True if close has been received from the peer. + + // Streaming reads will be delivered to this observer. For a call that only does unary read it may remain null. + protected IObserver<TRead> readObserver; + + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) + { + this.serializer = Preconditions.CheckNotNull(serializer); + this.deserializer = Preconditions.CheckNotNull(deserializer); + + this.sendFinishedHandler = CreateBatchCompletionCallback(HandleSendFinished); + this.readFinishedHandler = CreateBatchCompletionCallback(HandleReadFinished); + this.halfclosedHandler = CreateBatchCompletionCallback(HandleHalfclosed); + } + + /// <summary> + /// Requests cancelling the call. + /// </summary> + public void Cancel() + { + lock (myLock) + { + Preconditions.CheckState(started); + cancelRequested = true; + + if (!disposed) + { + call.Cancel(); + } + } + } + + /// <summary> + /// Requests cancelling the call with given status. + /// </summary> + public void CancelWithStatus(Status status) + { + lock (myLock) + { + Preconditions.CheckState(started); + cancelRequested = true; + + if (!disposed) + { + call.CancelWithStatus(status); + } + } + } + + protected void InitializeInternal(CallSafeHandle call) + { + lock (myLock) + { + // Make sure this object and the delegated held by it will not be garbage collected + // before we release this handle. + gchandle = GCHandle.Alloc(this); + this.call = call; + } + } + + /// <summary> + /// Initiates sending a message. Only once send operation can be active at a time. + /// completionDelegate is invoked upon completion. + /// </summary> + protected void StartSendMessageInternal(TWrite msg, AsyncCompletionDelegate completionDelegate) + { + byte[] payload = UnsafeSerialize(msg); + + lock (myLock) + { + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + CheckSendingAllowed(); + + call.StartSendMessage(payload, sendFinishedHandler); + sendCompletionDelegate = completionDelegate; + } + } + + /// <summary> + /// Requests receiving a next message. + /// </summary> + protected void StartReceiveMessage() + { + lock (myLock) + { + Preconditions.CheckState(started); + Preconditions.CheckState(!disposed); + Preconditions.CheckState(!errorOccured); + + Preconditions.CheckState(!readingDone); + Preconditions.CheckState(!readPending); + + call.StartReceiveMessage(readFinishedHandler); + readPending = true; + } + } + + /// <summary> + /// Default behavior just completes the read observer, but more sofisticated behavior might be required + /// by subclasses. + /// </summary> + protected virtual void CompleteReadObserver() + { + FireReadObserverOnCompleted(); + } + + /// <summary> + /// If there are no more pending actions and no new actions can be started, releases + /// the underlying native resources. + /// </summary> + protected bool ReleaseResourcesIfPossible() + { + if (!disposed && call != null) + { + if (halfclosed && readingDone && finished) + { + ReleaseResources(); + return true; + } + } + return false; + } + + private void ReleaseResources() + { + if (call != null) + { + call.Dispose(); + } + gchandle.Free(); + disposed = true; + } + + protected void CheckSendingAllowed() + { + Preconditions.CheckState(started); + Preconditions.CheckState(!disposed); + Preconditions.CheckState(!errorOccured); + + Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); + Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); + } + + protected byte[] UnsafeSerialize(TWrite msg) + { + return serializer(msg); + } + + protected bool TrySerialize(TWrite msg, out byte[] payload) + { + try + { + payload = serializer(msg); + return true; + } + catch(Exception) + { + Console.WriteLine("Exception occured while trying to serialize message"); + payload = null; + return false; + } + } + + protected bool TryDeserialize(byte[] payload, out TRead msg) + { + try + { + msg = deserializer(payload); + return true; + } + catch(Exception) + { + Console.WriteLine("Exception occured while trying to deserialize message"); + msg = default(TRead); + return false; + } + } + + protected void FireReadObserverOnNext(TRead value) + { + try + { + readObserver.OnNext(value); + } + catch(Exception e) + { + Console.WriteLine("Exception occured while invoking readObserver.OnNext: " + e); + } + } + + protected void FireReadObserverOnCompleted() + { + try + { + readObserver.OnCompleted(); + } + catch(Exception e) + { + Console.WriteLine("Exception occured while invoking readObserver.OnCompleted: " + e); + } + } + + protected void FireReadObserverOnError(Exception error) + { + try + { + readObserver.OnError(error); + } + catch(Exception e) + { + Console.WriteLine("Exception occured while invoking readObserver.OnError: " + e); + } + } + + protected void FireCompletion(AsyncCompletionDelegate completionDelegate, Exception error) + { + try + { + completionDelegate(error); + } + catch(Exception e) + { + Console.WriteLine("Exception occured while invoking completion delegate: " + e); + } + } + + /// <summary> + /// Creates completion callback delegate that wraps the batch completion handler in a try catch block to + /// prevent propagating exceptions accross managed/unmanaged boundary. + /// </summary> + protected CompletionCallbackDelegate CreateBatchCompletionCallback(Action<bool, BatchContextSafeHandleNotOwned> handler) + { + return new CompletionCallbackDelegate( (error, batchContextPtr) => { + try + { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + bool wasError = (error != GRPCOpError.GRPC_OP_OK); + handler(wasError, ctx); + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + }); + } + + /// <summary> + /// Handles send completion. + /// </summary> + private void HandleSendFinished(bool wasError, BatchContextSafeHandleNotOwned ctx) + { + AsyncCompletionDelegate origCompletionDelegate = null; + lock (myLock) + { + origCompletionDelegate = sendCompletionDelegate; + sendCompletionDelegate = null; + + ReleaseResourcesIfPossible(); + } + + if (wasError) + { + FireCompletion(origCompletionDelegate, new OperationFailedException("Send failed")); + } + else + { + FireCompletion(origCompletionDelegate, null); + } + } + + /// <summary> + /// Handles halfclose completion. + /// </summary> + private void HandleHalfclosed(bool wasError, BatchContextSafeHandleNotOwned ctx) + { + AsyncCompletionDelegate origCompletionDelegate = null; + lock (myLock) + { + halfclosed = true; + origCompletionDelegate = sendCompletionDelegate; + sendCompletionDelegate = null; + + ReleaseResourcesIfPossible(); + } + + if (wasError) + { + FireCompletion(origCompletionDelegate, new OperationFailedException("Halfclose failed")); + } + else + { + FireCompletion(origCompletionDelegate, null); + } + + } + + /// <summary> + /// Handles streaming read completion. + /// </summary> + private void HandleReadFinished(bool wasError, BatchContextSafeHandleNotOwned ctx) + { + var payload = ctx.GetReceivedMessage(); + + lock (myLock) + { + readPending = false; + if (payload == null) + { + readingDone = true; + } + + ReleaseResourcesIfPossible(); + } + + // TODO: handle the case when error occured... + + if (payload != null) + { + // TODO: handle deserialization error + TRead msg; + TryDeserialize(payload, out msg); + + FireReadObserverOnNext(msg); + + // Start a new read. The current one has already been delivered, + // so correct ordering of reads is assured. + StartReceiveMessage(); + } + else + { + CompleteReadObserver(); + } + } + } +}
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs new file mode 100644 index 0000000000..d3a2be553f --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -0,0 +1,125 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Handles server side native call lifecycle. + /// </summary> + internal class AsyncCallServer<TRequest, TResponse> : AsyncCallBase<TResponse, TRequest> + { + readonly CompletionCallbackDelegate finishedServersideHandler; + readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); + + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer) : base(serializer, deserializer) + { + this.finishedServersideHandler = CreateBatchCompletionCallback(HandleFinishedServerside); + } + + public void Initialize(CallSafeHandle call) + { + InitializeInternal(call); + } + + /// <summary> + /// Starts a server side call. Currently, all server side calls are implemented as duplex + /// streaming call and they are adapted to the appropriate streaming arity. + /// </summary> + public Task ServerSideCallAsync(IObserver<TRequest> readObserver) + { + lock (myLock) + { + Preconditions.CheckNotNull(call); + + started = true; + this.readObserver = readObserver; + + call.StartServerSide(finishedServersideHandler); + StartReceiveMessage(); + return finishedServersideTcs.Task; + } + } + + /// <summary> + /// Sends a streaming response. Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendMessage(TResponse msg, AsyncCompletionDelegate completionDelegate) + { + StartSendMessageInternal(msg, completionDelegate); + } + + /// <summary> + /// Sends call result status, also indicating server is done with streaming responses. + /// Only one pending send action is allowed at any given time. + /// completionDelegate is called when the operation finishes. + /// </summary> + public void StartSendStatusFromServer(Status status, AsyncCompletionDelegate completionDelegate) + { + lock (myLock) + { + Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null"); + CheckSendingAllowed(); + + call.StartSendStatusFromServer(status, halfclosedHandler); + halfcloseRequested = true; + sendCompletionDelegate = completionDelegate; + } + } + + /// <summary> + /// Handles the server side close completion. + /// </summary> + private void HandleFinishedServerside(bool wasError, BatchContextSafeHandleNotOwned ctx) + { + lock (myLock) + { + finished = true; + + ReleaseResourcesIfPossible(); + } + // TODO: handle error ... + + finishedServersideTcs.SetResult(null); + } + } +}
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs new file mode 100644 index 0000000000..b78bb497fa --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/AsyncCompletion.cs @@ -0,0 +1,95 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; +using Grpc.Core.Utils; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// If error != null, there's been an error or operation has been cancelled. + /// </summary> + internal delegate void AsyncCompletionDelegate(Exception error); + + /// <summary> + /// Helper for transforming AsyncCompletionDelegate into full-fledged Task. + /// </summary> + internal class AsyncCompletionTaskSource + { + readonly TaskCompletionSource<object> tcs = new TaskCompletionSource<object>(); + readonly AsyncCompletionDelegate completionDelegate; + + public AsyncCompletionTaskSource() + { + completionDelegate = new AsyncCompletionDelegate(HandleCompletion); + } + + public Task Task + { + get + { + return tcs.Task; + } + } + + public AsyncCompletionDelegate CompletionDelegate + { + get + { + return completionDelegate; + } + } + + private void HandleCompletion(Exception error) + { + if (error == null) + { + tcs.SetResult(null); + return; + } + if (error is OperationCanceledException) + { + tcs.SetCanceled(); + return; + } + tcs.SetException(error); + } + } + +}
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 1c0bc98f06..f399c5d9b8 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -38,7 +38,6 @@ using Grpc.Core; namespace Grpc.Core.Internal { - //TODO: rename the delegate internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr); /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs index fb59e86e2d..859f2ee027 100644 --- a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs +++ b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs @@ -47,9 +47,10 @@ namespace Grpc.Core.Internal public void OnCompleted() { - + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendCloseFromClient(taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendCloseFromClientAsync().Wait(); + taskSource.Task.Wait(); } public void OnError(Exception error) @@ -59,8 +60,10 @@ namespace Grpc.Core.Internal public void OnNext(TWrite value) { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendMessage(value, taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendMessageAsync(value).Wait(); + taskSource.Task.Wait(); } } } diff --git a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs index 08d9921475..707070f98a 100644 --- a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs +++ b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs @@ -40,19 +40,21 @@ namespace Grpc.Core.Internal /// Observer that writes all arriving messages to a call abstraction (in blocking fashion) /// and then halfcloses the call. Used for server-side call handling. /// </summary> - internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite> + internal class ServerStreamingOutputObserver<TRequest, TResponse> : IObserver<TResponse> { - readonly AsyncCall<TWrite, TRead> call; + readonly AsyncCallServer<TRequest, TResponse> call; - public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call) + public ServerStreamingOutputObserver(AsyncCallServer<TRequest, TResponse> call) { this.call = call; } public void OnCompleted() { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendStatusFromServer(new Status(StatusCode.OK, ""), taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait(); + taskSource.Task.Wait(); } public void OnError(Exception error) @@ -61,10 +63,12 @@ namespace Grpc.Core.Internal throw new InvalidOperationException("This should never be called."); } - public void OnNext(TWrite value) + public void OnNext(TResponse value) { + var taskSource = new AsyncCompletionTaskSource(); + call.StartSendMessage(value, taskSource.CompletionDelegate); // TODO: how bad is the Wait here? - call.SendMessageAsync(value).Wait(); + taskSource.Task.Wait(); } } } diff --git a/src/csharp/Grpc.Core/OperationFailedException.cs b/src/csharp/Grpc.Core/OperationFailedException.cs new file mode 100644 index 0000000000..34a8c95a85 --- /dev/null +++ b/src/csharp/Grpc.Core/OperationFailedException.cs @@ -0,0 +1,48 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + /// <summary> + /// Thrown when gRPC operation fails. + /// </summary> + public class OperationFailedException : Exception + { + public OperationFailedException(string message) : base(message) + { + } + } +} + diff --git a/src/csharp/Grpc.Core/ServerCallHandler.cs b/src/csharp/Grpc.Core/ServerCallHandler.cs index 289f97aece..3eb8422f57 100644 --- a/src/csharp/Grpc.Core/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/ServerCallHandler.cs @@ -32,7 +32,9 @@ #endregion using System; +using System.Linq; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core { @@ -54,17 +56,17 @@ namespace Grpc.Core public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { - var asyncCall = new AsyncCall<TResponse, TRequest>( + var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer); - asyncCall.InitializeServer(call); + asyncCall.Initialize(call); - var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); + var requestObserver = new RecordingObserver<TRequest>(); + var finishedTask = asyncCall.ServerSideCallAsync(requestObserver); - var request = asyncCall.ReceiveMessageAsync().Result; - - var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); + var request = requestObserver.ToList().Result.Single(); + var responseObserver = new ServerStreamingOutputObserver<TRequest, TResponse>(asyncCall); handler(request, responseObserver); finishedTask.Wait(); @@ -85,15 +87,15 @@ namespace Grpc.Core public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { - var asyncCall = new AsyncCall<TResponse, TRequest>( + var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer); - asyncCall.InitializeServer(call); + asyncCall.Initialize(call); - var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); + var responseObserver = new ServerStreamingOutputObserver<TRequest,TResponse>(asyncCall); var requestObserver = handler(responseObserver); - var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver); + var finishedTask = asyncCall.ServerSideCallAsync(requestObserver); finishedTask.Wait(); } } @@ -103,17 +105,15 @@ namespace Grpc.Core public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) { // We don't care about the payload type here. - AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( + var asyncCall = new AsyncCallServer<byte[], byte[]>( (payload) => payload, (payload) => payload); + asyncCall.Initialize(call); - asyncCall.InitializeServer(call); - - var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>()); + var finishedTask = asyncCall.ServerSideCallAsync(new NullObserver<byte[]>()); - // TODO: this makes the call finish before all reads can be done which causes trouble - // in AsyncCall.HandleReadFinished callback. Revisit this. - asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait(); + // TODO: check result of the completion status. + asyncCall.StartSendStatusFromServer(new Status(StatusCode.Unimplemented, "No such method."), new AsyncCompletionDelegate((error) => {})); finishedTask.Wait(); } diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs index 5ea1df7b48..6ba5d992a6 100644 --- a/src/csharp/Grpc.Core/Status.cs +++ b/src/csharp/Grpc.Core/Status.cs @@ -50,6 +50,9 @@ namespace Grpc.Core this.detail = detail; } + /// <summary> + /// Gets the gRPC status code. OK indicates success, all other values indicate an error. + /// </summary> public StatusCode StatusCode { get @@ -58,6 +61,9 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the detail. + /// </summary> public string Detail { get diff --git a/src/csharp/Grpc.Core/Utils/Preconditions.cs b/src/csharp/Grpc.Core/Utils/Preconditions.cs new file mode 100644 index 0000000000..b17ce42117 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/Preconditions.cs @@ -0,0 +1,113 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Diagnostics; + +namespace Grpc.Core.Utils +{ + public static class Preconditions + { + /// <summary> + /// Throws ArgumentException if condition is false. + /// </summary> + public static void CheckArgument(bool condition) + { + if (!condition) + { + throw new ArgumentException(); + } + } + + /// <summary> + /// Throws ArgumentException with given message if condition is false. + /// </summary> + public static void CheckArgument(bool condition, string errorMessage) + { + if (!condition) + { + throw new ArgumentException(errorMessage); + } + } + + /// <summary> + /// Throws NullReferenceException if reference is null. + /// </summary> + public static T CheckNotNull<T> (T reference) + { + if (reference == null) + { + throw new NullReferenceException(); + } + return reference; + } + + /// <summary> + /// Throws NullReferenceException with given message if reference is null. + /// </summary> + public static T CheckNotNull<T> (T reference, string errorMessage) + { + if (reference == null) + { + throw new NullReferenceException(errorMessage); + } + return reference; + } + + /// <summary> + /// Throws InvalidOperationException if condition is false. + /// </summary> + public static void CheckState(bool condition) + { + if (!condition) + { + throw new InvalidOperationException(); + } + } + + /// <summary> + /// Throws InvalidOperationException with given message if condition is false. + /// </summary> + public static void CheckState(bool condition, string errorMessage) + { + if (!condition) + { + throw new InvalidOperationException(errorMessage); + } + } + } +} + |