From 8b86b15e67fdcb7dc175ff3779b1f4ae4e7ec2ed Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Thu, 19 Feb 2015 21:01:05 -0800 Subject: renaming of VS projects and other minor structural fixes --- src/csharp/Grpc.Core/.gitignore | 2 + src/csharp/Grpc.Core/Call.cs | 98 ++++ src/csharp/Grpc.Core/Calls.cs | 108 ++++ src/csharp/Grpc.Core/Channel.cs | 85 +++ src/csharp/Grpc.Core/ClientStreamingAsyncResult.cs | 70 +++ src/csharp/Grpc.Core/Grpc.Core.csproj | 71 +++ src/csharp/Grpc.Core/GrpcEnvironment.cs | 135 +++++ src/csharp/Grpc.Core/Internal/AsyncCall.cs | 616 +++++++++++++++++++++ .../Internal/BatchContextSafeHandleNotOwned.cs | 96 ++++ src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 181 ++++++ src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs | 67 +++ .../Internal/ClientStreamingInputObserver.cs | 67 +++ .../Internal/CompletionQueueSafeHandle.cs | 83 +++ src/csharp/Grpc.Core/Internal/Enums.cs | 115 ++++ src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs | 125 +++++ .../Grpc.Core/Internal/SafeHandleZeroIsInvalid.cs | 67 +++ src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs | 112 ++++ .../Internal/ServerStreamingOutputObserver.cs | 71 +++ src/csharp/Grpc.Core/Internal/Timespec.cs | 114 ++++ src/csharp/Grpc.Core/Marshaller.cs | 87 +++ src/csharp/Grpc.Core/Method.cs | 97 ++++ src/csharp/Grpc.Core/Properties/AssemblyInfo.cs | 24 + src/csharp/Grpc.Core/RpcException.cs | 60 ++ src/csharp/Grpc.Core/Server.cs | 213 +++++++ src/csharp/Grpc.Core/ServerCallHandler.cs | 136 +++++ src/csharp/Grpc.Core/ServerCalls.cs | 58 ++ src/csharp/Grpc.Core/ServerServiceDefinition.cs | 98 ++++ src/csharp/Grpc.Core/Status.cs | 69 +++ src/csharp/Grpc.Core/StatusCode.cs | 181 ++++++ src/csharp/Grpc.Core/Utils/RecordingObserver.cs | 65 +++ src/csharp/Grpc.Core/Utils/RecordingQueue.cs | 84 +++ 31 files changed, 3455 insertions(+) create mode 100644 src/csharp/Grpc.Core/.gitignore create mode 100644 src/csharp/Grpc.Core/Call.cs create mode 100644 src/csharp/Grpc.Core/Calls.cs create mode 100644 src/csharp/Grpc.Core/Channel.cs create mode 100644 src/csharp/Grpc.Core/ClientStreamingAsyncResult.cs create mode 100644 src/csharp/Grpc.Core/Grpc.Core.csproj create mode 100644 src/csharp/Grpc.Core/GrpcEnvironment.cs create mode 100644 src/csharp/Grpc.Core/Internal/AsyncCall.cs create mode 100644 src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs create mode 100644 src/csharp/Grpc.Core/Internal/CallSafeHandle.cs create mode 100644 src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs create mode 100644 src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs create mode 100644 src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs create mode 100644 src/csharp/Grpc.Core/Internal/Enums.cs create mode 100644 src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs create mode 100644 src/csharp/Grpc.Core/Internal/SafeHandleZeroIsInvalid.cs create mode 100644 src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs create mode 100644 src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs create mode 100644 src/csharp/Grpc.Core/Internal/Timespec.cs create mode 100644 src/csharp/Grpc.Core/Marshaller.cs create mode 100644 src/csharp/Grpc.Core/Method.cs create mode 100644 src/csharp/Grpc.Core/Properties/AssemblyInfo.cs create mode 100644 src/csharp/Grpc.Core/RpcException.cs create mode 100644 src/csharp/Grpc.Core/Server.cs create mode 100644 src/csharp/Grpc.Core/ServerCallHandler.cs create mode 100644 src/csharp/Grpc.Core/ServerCalls.cs create mode 100644 src/csharp/Grpc.Core/ServerServiceDefinition.cs create mode 100644 src/csharp/Grpc.Core/Status.cs create mode 100644 src/csharp/Grpc.Core/StatusCode.cs create mode 100644 src/csharp/Grpc.Core/Utils/RecordingObserver.cs create mode 100644 src/csharp/Grpc.Core/Utils/RecordingQueue.cs (limited to 'src/csharp/Grpc.Core') diff --git a/src/csharp/Grpc.Core/.gitignore b/src/csharp/Grpc.Core/.gitignore new file mode 100644 index 0000000000..8d4a6c08a8 --- /dev/null +++ b/src/csharp/Grpc.Core/.gitignore @@ -0,0 +1,2 @@ +bin +obj \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs new file mode 100644 index 0000000000..72dca68895 --- /dev/null +++ b/src/csharp/Grpc.Core/Call.cs @@ -0,0 +1,98 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + public class Call + { + readonly string methodName; + readonly Func requestSerializer; + readonly Func responseDeserializer; + readonly Channel channel; + + public Call(string methodName, + Func requestSerializer, + Func responseDeserializer, + TimeSpan timeout, + Channel channel) { + this.methodName = methodName; + this.requestSerializer = requestSerializer; + this.responseDeserializer = responseDeserializer; + this.channel = channel; + } + + public Call(Method method, Channel channel) + { + this.methodName = method.Name; + this.requestSerializer = method.RequestMarshaller.Serializer; + this.responseDeserializer = method.ResponseMarshaller.Deserializer; + this.channel = channel; + } + + public Channel Channel + { + get + { + return this.channel; + } + } + + public string MethodName + { + get + { + return this.methodName; + } + } + + public Func RequestSerializer + { + get + { + return this.requestSerializer; + } + } + + public Func ResponseDeserializer + { + get + { + return this.responseDeserializer; + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs new file mode 100644 index 0000000000..b67332676a --- /dev/null +++ b/src/csharp/Grpc.Core/Calls.cs @@ -0,0 +1,108 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + // NOTE: this class is work-in-progress + + /// + /// Helper methods for generated stubs to make RPC calls. + /// + public static class Calls + { + public static TResponse BlockingUnaryCall(Call call, TRequest req, CancellationToken token) + { + //TODO: implement this in real synchronous style. + try { + return AsyncUnaryCall(call, req, token).Result; + } catch(AggregateException ae) { + foreach (var e in ae.InnerExceptions) + { + if (e is RpcException) + { + throw e; + } + } + throw; + } + } + + public static async Task AsyncUnaryCall(Call call, TRequest req, CancellationToken token) + { + var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + return await asyncCall.UnaryCallAsync(req); + } + + public static void AsyncServerStreamingCall(Call call, TRequest req, IObserver outputs, CancellationToken token) + { + var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); + + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + asyncCall.StartServerStreamingCall(req, outputs); + } + + public static ClientStreamingAsyncResult AsyncClientStreamingCall(Call call, CancellationToken token) + { + var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + var task = asyncCall.ClientStreamingCallAsync(); + var inputs = new ClientStreamingInputObserver(asyncCall); + return new ClientStreamingAsyncResult(task, inputs); + } + + public static TResponse BlockingClientStreamingCall(Call call, IObservable inputs, CancellationToken token) + { + throw new NotImplementedException(); + } + + public static IObserver DuplexStreamingCall(Call call, IObserver outputs, CancellationToken token) + { + var asyncCall = new AsyncCall(call.RequestSerializer, call.ResponseDeserializer); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + + asyncCall.StartDuplexStreamingCall(outputs); + return new ClientStreamingInputObserver(asyncCall); + } + + private static CompletionQueueSafeHandle GetCompletionQueue() { + return GrpcEnvironment.ThreadPool.CompletionQueue; + } + } +} + diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs new file mode 100644 index 0000000000..942651cf39 --- /dev/null +++ b/src/csharp/Grpc.Core/Channel.cs @@ -0,0 +1,85 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + public class Channel : IDisposable + { + readonly ChannelSafeHandle handle; + readonly String target; + + // TODO: add way how to create grpc_secure_channel.... + // TODO: add support for channel args... + public Channel(string target) + { + this.handle = ChannelSafeHandle.Create(target, IntPtr.Zero); + this.target = target; + } + + internal ChannelSafeHandle Handle + { + get + { + return this.handle; + } + } + + public string Target + { + get + { + return this.target; + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (handle != null && !handle.IsInvalid) + { + handle.Dispose(); + } + } + } +} diff --git a/src/csharp/Grpc.Core/ClientStreamingAsyncResult.cs b/src/csharp/Grpc.Core/ClientStreamingAsyncResult.cs new file mode 100644 index 0000000000..44580a1154 --- /dev/null +++ b/src/csharp/Grpc.Core/ClientStreamingAsyncResult.cs @@ -0,0 +1,70 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; + +namespace Grpc.Core +{ + /// + /// Return type for client streaming async method. + /// + public struct ClientStreamingAsyncResult + { + readonly Task task; + readonly IObserver inputs; + + public ClientStreamingAsyncResult(Task task, IObserver inputs) + { + this.task = task; + this.inputs = inputs; + } + + public Task Task + { + get + { + return this.task; + } + } + + public IObserver Inputs + { + get + { + return this.inputs; + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj new file mode 100644 index 0000000000..4ad32e10e4 --- /dev/null +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -0,0 +1,71 @@ + + + + Debug + AnyCPU + 10.0.0 + 2.0 + {CCC4440E-49F7-4790-B0AF-FEABB0837AE7} + Library + Grpc.Core + Grpc.Core + v4.5 + + + true + full + false + bin\Debug + DEBUG; + prompt + 4 + false + + + full + true + bin\Release + prompt + 4 + false + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs new file mode 100644 index 0000000000..0e3a0a581c --- /dev/null +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -0,0 +1,135 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + /// + /// Encapsulates initialization and shutdown of gRPC library. + /// + public class GrpcEnvironment + { + const int THREAD_POOL_SIZE = 4; + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_init(); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_shutdown(); + + static object staticLock = new object(); + static volatile GrpcEnvironment instance; + + readonly GrpcThreadPool threadPool; + bool isClosed; + + /// + /// Makes sure GRPC environment is initialized. Subsequent invocations don't have any + /// effect unless you call Shutdown first. + /// Although normal use cases assume you will call this just once in your application's + /// lifetime (and call Shutdown once you're done), for the sake of easier testing it's + /// allowed to initialize the environment again after it has been successfully shutdown. + /// + public static void Initialize() { + lock(staticLock) + { + if (instance == null) + { + instance = new GrpcEnvironment(); + } + } + } + + /// + /// Shuts down the GRPC environment if it was initialized before. + /// Repeated invocations have no effect. + /// + public static void Shutdown() + { + lock(staticLock) + { + if (instance != null) + { + instance.Close(); + instance = null; + } + } + } + + internal static GrpcThreadPool ThreadPool + { + get + { + var inst = instance; + if (inst == null) + { + throw new InvalidOperationException("GRPC environment not initialized"); + } + return inst.threadPool; + } + } + + /// + /// Creates gRPC environment. + /// + private GrpcEnvironment() + { + grpcsharp_init(); + threadPool = new GrpcThreadPool(THREAD_POOL_SIZE); + threadPool.Start(); + // TODO: use proper logging here + Console.WriteLine("GRPC initialized."); + } + + /// + /// Shuts down this environment. + /// + private void Close() + { + if (isClosed) + { + throw new InvalidOperationException("Close has already been called"); + } + threadPool.Stop(); + grpcsharp_shutdown(); + isClosed = true; + + // TODO: use proper logging here + Console.WriteLine("GRPC shutdown."); + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs new file mode 100644 index 0000000000..5e96092e27 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -0,0 +1,616 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + /// + /// Handles native call lifecycle and provides convenience methods. + /// + internal class AsyncCall + { + readonly Func serializer; + readonly Func deserializer; + + readonly CompletionCallbackDelegate unaryResponseHandler; + readonly CompletionCallbackDelegate finishedHandler; + readonly CompletionCallbackDelegate writeFinishedHandler; + readonly CompletionCallbackDelegate readFinishedHandler; + readonly CompletionCallbackDelegate halfclosedHandler; + readonly CompletionCallbackDelegate finishedServersideHandler; + + object myLock = new object(); + GCHandle gchandle; + CallSafeHandle call; + bool disposed; + + bool server; + + bool started; + bool errorOccured; + bool cancelRequested; + bool readingDone; + bool halfcloseRequested; + bool halfclosed; + bool finished; + + // Completion of a pending write if not null. + TaskCompletionSource writeTcs; + + // Completion of a pending read if not null. + TaskCompletionSource readTcs; + + // Completion of a pending halfclose if not null. + TaskCompletionSource halfcloseTcs; + + // Completion of a pending unary response if not null. + TaskCompletionSource unaryResponseTcs; + + // Set after status is received on client. Only used for server streaming and duplex streaming calls. + Nullable finishedStatus; + TaskCompletionSource finishedServersideTcs = new TaskCompletionSource(); + + // For streaming, the reads will be delivered to this observer. + IObserver readObserver; + + public AsyncCall(Func serializer, Func deserializer) + { + this.serializer = serializer; + this.deserializer = deserializer; + this.unaryResponseHandler = HandleUnaryResponse; + this.finishedHandler = HandleFinished; + this.writeFinishedHandler = HandleWriteFinished; + this.readFinishedHandler = HandleReadFinished; + this.halfclosedHandler = HandleHalfclosed; + this.finishedServersideHandler = HandleFinishedServerside; + } + + public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) + { + InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false); + } + + public void InitializeServer(CallSafeHandle call) + { + InitializeInternal(call, true); + } + + public Task UnaryCallAsync(TWrite msg) + { + lock (myLock) + { + started = true; + halfcloseRequested = true; + readingDone = true; + + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + unaryResponseTcs = new TaskCompletionSource(); + call.StartUnary(payload, unaryResponseHandler); + + return unaryResponseTcs.Task; + } + } + + public Task ClientStreamingCallAsync() + { + lock (myLock) + { + started = true; + readingDone = true; + + unaryResponseTcs = new TaskCompletionSource(); + call.StartClientStreaming(unaryResponseHandler); + + return unaryResponseTcs.Task; + } + } + + public void StartServerStreamingCall(TWrite msg, IObserver readObserver) + { + lock (myLock) + { + started = true; + halfcloseRequested = true; + + this.readObserver = readObserver; + + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + call.StartServerStreaming(payload, finishedHandler); + + ReceiveMessageAsync(); + } + } + + public void StartDuplexStreamingCall(IObserver readObserver) + { + lock (myLock) + { + started = true; + + this.readObserver = readObserver; + + call.StartDuplexStreaming(finishedHandler); + + ReceiveMessageAsync(); + } + } + + public Task ServerSideUnaryRequestCallAsync() + { + lock (myLock) + { + started = true; + call.StartServerSide(finishedServersideHandler); + return finishedServersideTcs.Task; + } + } + + public Task ServerSideStreamingRequestCallAsync(IObserver readObserver) + { + lock (myLock) + { + started = true; + call.StartServerSide(finishedServersideHandler); + + if (this.readObserver != null) + { + throw new InvalidOperationException("Already registered an observer."); + } + this.readObserver = readObserver; + ReceiveMessageAsync(); + + return finishedServersideTcs.Task; + } + } + + public Task SendMessageAsync(TWrite msg) + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + CheckNoError(); + + if (halfcloseRequested) + { + throw new InvalidOperationException("Already halfclosed."); + } + + if (writeTcs != null) + { + throw new InvalidOperationException("Only one write can be pending at a time"); + } + + // TODO: wrap serialization... + byte[] payload = serializer(msg); + + call.StartSendMessage(payload, writeFinishedHandler); + writeTcs = new TaskCompletionSource(); + return writeTcs.Task; + } + } + + public Task SendCloseFromClientAsync() + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + CheckNoError(); + + if (halfcloseRequested) + { + throw new InvalidOperationException("Already halfclosed."); + } + + call.StartSendCloseFromClient(halfclosedHandler); + + halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource(); + return halfcloseTcs.Task; + } + } + + public Task SendStatusFromServerAsync(Status status) + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + CheckNoError(); + + if (halfcloseRequested) + { + throw new InvalidOperationException("Already halfclosed."); + } + + call.StartSendStatusFromServer(status, halfclosedHandler); + halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource(); + return halfcloseTcs.Task; + } + } + + public Task ReceiveMessageAsync() + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + CheckNoError(); + + if (readingDone) + { + throw new InvalidOperationException("Already read the last message."); + } + + if (readTcs != null) + { + throw new InvalidOperationException("Only one read can be pending at a time"); + } + + call.StartReceiveMessage(readFinishedHandler); + + readTcs = new TaskCompletionSource(); + return readTcs.Task; + } + } + + public void Cancel() + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + cancelRequested = true; + } + // grpc_call_cancel is threadsafe + call.Cancel(); + } + + public void CancelWithStatus(Status status) + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + cancelRequested = true; + } + // grpc_call_cancel_with_status is threadsafe + call.CancelWithStatus(status); + } + + private void InitializeInternal(CallSafeHandle call, bool server) + { + lock (myLock) + { + // Make sure this object and the delegated held by it will not be garbage collected + // before we release this handle. + gchandle = GCHandle.Alloc(this); + this.call = call; + this.server = server; + } + } + + private void CheckStarted() + { + if (!started) + { + throw new InvalidOperationException("Call not started"); + } + } + + private void CheckNotDisposed() + { + if (disposed) + { + throw new InvalidOperationException("Call has already been disposed."); + } + } + + private void CheckNoError() + { + if (errorOccured) + { + throw new InvalidOperationException("Error occured when processing call."); + } + } + + private bool ReleaseResourcesIfPossible() + { + if (!disposed && call != null) + { + if (halfclosed && readingDone && finished) + { + ReleaseResources(); + return true; + } + } + return false; + } + + private void ReleaseResources() + { + if (call != null) { + call.Dispose(); + } + gchandle.Free(); + disposed = true; + } + + private void CompleteStreamObserver(Status status) + { + if (status.StatusCode != StatusCode.OK) + { + // TODO: wrap to handle exceptions; + readObserver.OnError(new RpcException(status)); + } else { + // TODO: wrap to handle exceptions; + readObserver.OnCompleted(); + } + } + + /// + /// Handler for unary response completion. + /// + private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + TaskCompletionSource tcs; + lock(myLock) + { + finished = true; + halfclosed = true; + tcs = unaryResponseTcs; + + ReleaseResourcesIfPossible(); + } + + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) + { + tcs.SetException(new RpcException( + new Status(StatusCode.Internal, "Internal error occured.") + )); + return; + } + + var status = ctx.GetReceivedStatus(); + if (status.StatusCode != StatusCode.OK) + { + tcs.SetException(new RpcException(status)); + return; + } + + // TODO: handle deserialize error... + var msg = deserializer(ctx.GetReceivedMessage()); + tcs.SetResult(msg); + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + TaskCompletionSource oldTcs = null; + lock (myLock) + { + oldTcs = writeTcs; + writeTcs = null; + } + + if (errorOccured) + { + // TODO: use the right type of exception... + oldTcs.SetException(new Exception("Write failed")); + } + else + { + // TODO: where does the continuation run? + oldTcs.SetResult(null); + } + + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + lock (myLock) + { + halfclosed = true; + + ReleaseResourcesIfPossible(); + } + + if (error != GRPCOpError.GRPC_OP_OK) + { + halfcloseTcs.SetException(new Exception("Halfclose failed")); + + } + else + { + halfcloseTcs.SetResult(null); + } + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var payload = ctx.GetReceivedMessage(); + + TaskCompletionSource oldTcs = null; + IObserver observer = null; + + Nullable status = null; + + lock (myLock) + { + oldTcs = readTcs; + readTcs = null; + if (payload == null) + { + readingDone = true; + } + observer = readObserver; + status = finishedStatus; + } + + // TODO: wrap deserialization... + TRead msg = payload != null ? deserializer(payload) : default(TRead); + + oldTcs.SetResult(msg); + + // TODO: make sure we deliver reads in the right order. + + if (observer != null) + { + if (payload != null) + { + // TODO: wrap to handle exceptions + observer.OnNext(msg); + + // start a new read + ReceiveMessageAsync(); + } + else + { + if (!server) + { + if (status.HasValue) + { + CompleteStreamObserver(status.Value); + } + } + else + { + // TODO: wrap to handle exceptions.. + observer.OnCompleted(); + } + // TODO: completeStreamObserver serverside... + } + } + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var status = ctx.GetReceivedStatus(); + + bool wasReadingDone; + + lock (myLock) + { + finished = true; + finishedStatus = status; + + wasReadingDone = readingDone; + + ReleaseResourcesIfPossible(); + } + + if (wasReadingDone) { + CompleteStreamObserver(status); + } + + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + lock(myLock) + { + finished = true; + + // TODO: because of the way server calls are implemented, we need to set + // reading done to true here. Should be fixed in the future. + readingDone = true; + + ReleaseResourcesIfPossible(); + } + // TODO: handle error ... + + finishedServersideTcs.SetResult(null); + + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + } +} \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs new file mode 100644 index 0000000000..75cd30e1a2 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs @@ -0,0 +1,96 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Grpc.Core; + +namespace Grpc.Core.Internal +{ + /// + /// Not owned version of + /// grpcsharp_batch_context + /// + internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen); + + [DllImport("grpc_csharp_ext.dll")] + static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char* + + [DllImport("grpc_csharp_ext.dll")] + static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char* + + public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false) + { + SetHandle(handle); + } + + public Status GetReceivedStatus() + { + // TODO: can the native method return string directly? + string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this)); + return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details); + } + + public byte[] GetReceivedMessage() + { + IntPtr len = grpcsharp_batch_context_recv_message_length(this); + if (len == new IntPtr(-1)) + { + return null; + } + byte[] data = new byte[(int) len]; + grpcsharp_batch_context_recv_message_to_buffer(this, data, new UIntPtr((ulong)data.Length)); + return data; + } + + public CallSafeHandle GetServerRpcNewCall() { + return grpcsharp_batch_context_server_rpc_new_call(this); + } + + public string GetServerRpcNewMethod() { + return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); + } + } +} \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs new file mode 100644 index 0000000000..659a383b4b --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -0,0 +1,181 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.InteropServices; +using Grpc.Core; + +namespace Grpc.Core.Internal +{ + //TODO: rename the delegate + internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr); + + /// + /// grpc_call from + /// + internal class CallSafeHandle : SafeHandleZeroIsInvalid + { + const UInt32 GRPC_WRITE_BUFFER_HINT = 1; + + [DllImport("grpc_csharp_ext.dll")] + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_call_destroy(IntPtr call); + + + private CallSafeHandle() + { + } + + public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) + { + return grpcsharp_channel_create_call(channel, cq, method, host, deadline); + } + + public void StartUnary(byte[] payload, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); + } + + public void StartClientStreaming(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); + } + + public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length))); + } + + public void StartDuplexStreaming(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback)); + } + + public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length))); + } + + public void StartSendCloseFromClient(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); + } + + public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail)); + } + + public void StartReceiveMessage(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_recv_message(this, callback)); + } + + public void StartServerSide(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_serverside(this, callback)); + } + + public void Cancel() + { + AssertCallOk(grpcsharp_call_cancel(this)); + } + + public void CancelWithStatus(Status status) + { + AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail)); + } + + protected override bool ReleaseHandle() + { + grpcsharp_call_destroy(handle); + return true; + } + + private static void AssertCallOk(GRPCCallError callError) + { + Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + } + + private static UInt32 GetFlags(bool buffered) { + return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; + } + } +} \ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs new file mode 100644 index 0000000000..f15ead3572 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -0,0 +1,67 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace Grpc.Core.Internal +{ + /// + /// grpc_channel from + /// + internal class ChannelSafeHandle : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern ChannelSafeHandle grpcsharp_channel_create(string target, IntPtr channelArgs); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_channel_destroy(IntPtr channel); + + private ChannelSafeHandle() + { + } + + public static ChannelSafeHandle Create(string target, IntPtr channelArgs) + { + return grpcsharp_channel_create(target, channelArgs); + } + + protected override bool ReleaseHandle() + { + grpcsharp_channel_destroy(handle); + return true; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs new file mode 100644 index 0000000000..fb59e86e2d --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs @@ -0,0 +1,67 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + internal class ClientStreamingInputObserver : IObserver + { + readonly AsyncCall call; + + public ClientStreamingInputObserver(AsyncCall call) + { + this.call = call; + } + + public void OnCompleted() + { + + // TODO: how bad is the Wait here? + call.SendCloseFromClientAsync().Wait(); + } + + public void OnError(Exception error) + { + throw new InvalidOperationException("This should never be called."); + } + + public void OnNext(TWrite value) + { + // TODO: how bad is the Wait here? + call.SendMessageAsync(value).Wait(); + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs new file mode 100644 index 0000000000..3f01fdbfd0 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs @@ -0,0 +1,83 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +namespace Grpc.Core.Internal +{ + /// + /// grpc_completion_queue from + /// + internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create(); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCompletionType grpcsharp_completion_queue_next_with_callback(CompletionQueueSafeHandle cq); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_completion_queue_destroy(IntPtr cq); + + private CompletionQueueSafeHandle() + { + } + + public static CompletionQueueSafeHandle Create() + { + return grpcsharp_completion_queue_create(); + } + + public GRPCCompletionType NextWithCallback() + { + return grpcsharp_completion_queue_next_with_callback(this); + } + + public void Shutdown() + { + grpcsharp_completion_queue_shutdown(this); + } + + protected override bool ReleaseHandle() + { + grpcsharp_completion_queue_destroy(handle); + return true; + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/Enums.cs new file mode 100644 index 0000000000..f363050b07 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/Enums.cs @@ -0,0 +1,115 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; + +namespace Grpc.Core.Internal +{ + /// + /// from grpc/grpc.h + /// + internal enum GRPCCallError + { + /* everything went ok */ + GRPC_CALL_OK = 0, + /* something failed, we don't know what */ + GRPC_CALL_ERROR, + /* this method is not available on the server */ + GRPC_CALL_ERROR_NOT_ON_SERVER, + /* this method is not available on the client */ + GRPC_CALL_ERROR_NOT_ON_CLIENT, + /* this method must be called before server_accept */ + GRPC_CALL_ERROR_ALREADY_ACCEPTED, + /* this method must be called before invoke */ + GRPC_CALL_ERROR_ALREADY_INVOKED, + /* this method must be called after invoke */ + GRPC_CALL_ERROR_NOT_INVOKED, + /* this call is already finished + (writes_done or write_status has already been called) */ + GRPC_CALL_ERROR_ALREADY_FINISHED, + /* there is already an outstanding read/write operation on the call */ + GRPC_CALL_ERROR_TOO_MANY_OPERATIONS, + /* the flags value was illegal for this call */ + GRPC_CALL_ERROR_INVALID_FLAGS + } + + /// + /// grpc_completion_type from grpc/grpc.h + /// + internal enum GRPCCompletionType + { + /* Shutting down */ + GRPC_QUEUE_SHUTDOWN, + + /* operation completion */ + GRPC_OP_COMPLETE, + + /* A read has completed */ + GRPC_READ, + + /* A write has been accepted by flow control */ + GRPC_WRITE_ACCEPTED, + + /* writes_done or write_status has been accepted */ + GRPC_FINISH_ACCEPTED, + + /* The metadata array sent by server received at client */ + GRPC_CLIENT_METADATA_READ, + + /* An RPC has finished. The event contains status. + * On the server this will be OK or Cancelled. */ + GRPC_FINISHED, + + /* A new RPC has arrived at the server */ + GRPC_SERVER_RPC_NEW, + + /* The server has finished shutting down */ + GRPC_SERVER_SHUTDOWN, + + /* must be last, forces users to include a default: case */ + GRPC_COMPLETION_DO_NOT_USE + } + + /// + /// grpc_op_error from grpc/grpc.h + /// + internal enum GRPCOpError + { + /* everything went ok */ + GRPC_OP_OK = 0, + /* something failed, we don't know what */ + GRPC_OP_ERROR + } +} + diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs new file mode 100644 index 0000000000..9e69fe2f43 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -0,0 +1,125 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + /// + /// Pool of threads polling on the same completion queue. + /// + internal class GrpcThreadPool + { + readonly object myLock = new object(); + readonly List threads = new List(); + readonly int poolSize; + + CompletionQueueSafeHandle cq; + + public GrpcThreadPool(int poolSize) { + this.poolSize = poolSize; + } + + public void Start() { + + lock (myLock) + { + if (cq != null) + { + throw new InvalidOperationException("Already started."); + } + + cq = CompletionQueueSafeHandle.Create(); + + for (int i = 0; i < poolSize; i++) + { + threads.Add(CreateAndStartThread(i)); + } + } + } + + public void Stop() { + + lock (myLock) + { + cq.Shutdown(); + + Console.WriteLine("Waiting for GPRC threads to finish."); + foreach (var thread in threads) + { + thread.Join(); + } + + cq.Dispose(); + + } + } + + internal CompletionQueueSafeHandle CompletionQueue + { + get + { + return cq; + } + } + + private Thread CreateAndStartThread(int i) + { + var thread = new Thread(new ThreadStart(RunHandlerLoop)); + thread.IsBackground = false; + thread.Start(); + thread.Name = "grpc " + i; + return thread; + } + + /// + /// Body of the polling thread. + /// + private void RunHandlerLoop() + { + GRPCCompletionType completionType; + do + { + completionType = cq.NextWithCallback(); + } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); + Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); + } + } + +} + diff --git a/src/csharp/Grpc.Core/Internal/SafeHandleZeroIsInvalid.cs b/src/csharp/Grpc.Core/Internal/SafeHandleZeroIsInvalid.cs new file mode 100644 index 0000000000..aa6fce2e96 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/SafeHandleZeroIsInvalid.cs @@ -0,0 +1,67 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; + +namespace Grpc.Core.Internal +{ + /// + /// Safe handle to wrap native objects. + /// + internal abstract class SafeHandleZeroIsInvalid : SafeHandle + { + public SafeHandleZeroIsInvalid() : base(IntPtr.Zero, true) + { + } + + public SafeHandleZeroIsInvalid(bool ownsHandle) : base(IntPtr.Zero, ownsHandle) + { + } + + public override bool IsInvalid + { + get + { + return handle == IntPtr.Zero; + } + } + + protected override bool ReleaseHandle() + { + // handle is not owned. + return true; + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs new file mode 100644 index 0000000000..de9bbaf7c1 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -0,0 +1,112 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Runtime.InteropServices; + +namespace Grpc.Core.Internal +{ + // TODO: we need to make sure that the delegates are not collected before invoked. + internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); + + /// + /// grpc_server from grpc/grpc.h + /// + internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); + + [DllImport("grpc_csharp_ext.dll")] + static extern Int32 grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_start(ServerSafeHandle server); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_shutdown(ServerSafeHandle server); + + // TODO: get rid of the old callback style + [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")] + static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_destroy(IntPtr server); + + private ServerSafeHandle() + { + } + + public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args) + { + // TODO: also grpc_secure_server_create... + return grpcsharp_server_create(cq, args); + } + + public int AddPort(string addr) + { + return grpcsharp_server_add_http2_port(this, addr); + } + + public void Start() + { + grpcsharp_server_start(this); + } + + public void Shutdown() + { + grpcsharp_server_shutdown(this); + } + + public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) + { + grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); + } + + public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) + { + return grpcsharp_server_request_call(this, cq, callback); + } + + protected override bool ReleaseHandle() + { + grpcsharp_server_destroy(handle); + return true; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs new file mode 100644 index 0000000000..08d9921475 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs @@ -0,0 +1,71 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + /// + /// Observer that writes all arriving messages to a call abstraction (in blocking fashion) + /// and then halfcloses the call. Used for server-side call handling. + /// + internal class ServerStreamingOutputObserver : IObserver + { + readonly AsyncCall call; + + public ServerStreamingOutputObserver(AsyncCall call) + { + this.call = call; + } + + public void OnCompleted() + { + // TODO: how bad is the Wait here? + call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait(); + } + + public void OnError(Exception error) + { + // TODO: implement this... + throw new InvalidOperationException("This should never be called."); + } + + public void OnNext(TWrite value) + { + // TODO: how bad is the Wait here? + call.SendMessageAsync(value).Wait(); + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs new file mode 100644 index 0000000000..b191ecde94 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/Timespec.cs @@ -0,0 +1,114 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading; + +namespace Grpc.Core.Internal +{ + /// + /// gpr_timespec from grpc/support/time.h + /// + [StructLayout(LayoutKind.Sequential)] + internal struct Timespec + { + const int nanosPerSecond = 1000 * 1000 * 1000; + const int nanosPerTick = 100; + + [DllImport("grpc_csharp_ext.dll")] + static extern Timespec gprsharp_now(); + + [DllImport("grpc_csharp_ext.dll")] + static extern Timespec gprsharp_inf_future(); + + [DllImport("grpc_csharp_ext.dll")] + static extern int gprsharp_sizeof_timespec(); + + // TODO: revisit this. + // NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8 + // so IntPtr seems to have the right size to work on both. + public System.IntPtr tv_sec; + public System.IntPtr tv_nsec; + + /// + /// Timespec a long time in the future. + /// + public static Timespec InfFuture + { + get + { + return gprsharp_inf_future(); + } + } + + public static Timespec Now + { + get + { + return gprsharp_now(); + } + } + + internal static int NativeSize + { + get + { + return gprsharp_sizeof_timespec(); + } + } + + /// + /// Creates a GPR deadline from current instant and given timeout. + /// + /// The from timeout. + public static Timespec DeadlineFromTimeout(TimeSpan timeout) { + if (timeout == Timeout.InfiniteTimeSpan) + { + return Timespec.InfFuture; + } + return Timespec.Now.Add(timeout); + } + + public Timespec Add(TimeSpan timeSpan) { + long nanos = tv_nsec.ToInt64() + (timeSpan.Ticks % TimeSpan.TicksPerSecond) * nanosPerTick; + long overflow_sec = (nanos > nanosPerSecond) ? 1 : 0; + + Timespec result; + result.tv_nsec = new IntPtr(nanos % nanosPerSecond); + result.tv_sec = new IntPtr(tv_sec.ToInt64() + (timeSpan.Ticks / TimeSpan.TicksPerSecond) + overflow_sec); + return result; + } + } +} + diff --git a/src/csharp/Grpc.Core/Marshaller.cs b/src/csharp/Grpc.Core/Marshaller.cs new file mode 100644 index 0000000000..602e0eb824 --- /dev/null +++ b/src/csharp/Grpc.Core/Marshaller.cs @@ -0,0 +1,87 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + /// + /// For serializing and deserializing messages. + /// + public struct Marshaller + { + readonly Func serializer; + readonly Func deserializer; + + public Marshaller(Func serializer, Func deserializer) + { + this.serializer = serializer; + this.deserializer = deserializer; + } + + public Func Serializer + { + get + { + return this.serializer; + } + } + + public Func Deserializer + { + get + { + return this.deserializer; + } + } + } + + public static class Marshallers { + + public static Marshaller Create(Func serializer, Func deserializer) + { + return new Marshaller(serializer, deserializer); + } + + public static Marshaller StringMarshaller + { + get + { + return new Marshaller(System.Text.Encoding.UTF8.GetBytes, + System.Text.Encoding.UTF8.GetString); + } + } + + } +} + diff --git a/src/csharp/Grpc.Core/Method.cs b/src/csharp/Grpc.Core/Method.cs new file mode 100644 index 0000000000..c94aa8161f --- /dev/null +++ b/src/csharp/Grpc.Core/Method.cs @@ -0,0 +1,97 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + public enum MethodType + { + Unary, + ClientStreaming, + ServerStreaming, + DuplexStreaming + } + + /// + /// A description of a service method. + /// + public class Method + { + readonly MethodType type; + readonly string name; + readonly Marshaller requestMarshaller; + readonly Marshaller responseMarshaller; + + public Method(MethodType type, string name, Marshaller requestMarshaller, Marshaller responseMarshaller) + { + this.type = type; + this.name = name; + this.requestMarshaller = requestMarshaller; + this.responseMarshaller = responseMarshaller; + } + + public MethodType Type + { + get + { + return this.type; + } + } + + public string Name + { + get + { + return this.name; + } + } + + public Marshaller RequestMarshaller + { + get + { + return this.requestMarshaller; + } + } + + public Marshaller ResponseMarshaller + { + get + { + return this.responseMarshaller; + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..37ba1e2263 --- /dev/null +++ b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs @@ -0,0 +1,24 @@ +using System.Reflection; +using System.Runtime.CompilerServices; + +// Information about this assembly is defined by the following attributes. +// Change them to the values specific to your project. +[assembly: AssemblyTitle ("Grpc.Core")] +[assembly: AssemblyDescription ("")] +[assembly: AssemblyConfiguration ("")] +[assembly: AssemblyCompany ("")] +[assembly: AssemblyProduct ("")] +[assembly: AssemblyCopyright ("Google Inc. All rights reserved.")] +[assembly: AssemblyTrademark ("")] +[assembly: AssemblyCulture ("")] +// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}". +// The form "{Major}.{Minor}.*" will automatically update the build and revision, +// and "{Major}.{Minor}.{Build}.*" will update just the revision. +[assembly: AssemblyVersion ("0.1.*")] +// The following attributes are used to specify the signing key for the assembly, +// if desired. See the Mono documentation for more information about signing. +//[assembly: AssemblyDelaySign(false)] +//[assembly: AssemblyKeyFile("")] + +[assembly: InternalsVisibleTo("Grpc.Core.Tests")] + diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs new file mode 100644 index 0000000000..5a9d0039bc --- /dev/null +++ b/src/csharp/Grpc.Core/RpcException.cs @@ -0,0 +1,60 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + public class RpcException : Exception + { + private readonly Status status; + + public RpcException(Status status) + { + this.status = status; + } + + public RpcException(Status status, string message) : base(message) + { + this.status = status; + } + + public Status Status { + get + { + return status; + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs new file mode 100644 index 0000000000..002592a3d8 --- /dev/null +++ b/src/csharp/Grpc.Core/Server.cs @@ -0,0 +1,213 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + /// + /// Server is implemented only to be able to do + /// in-process testing. + /// + public class Server + { + // TODO: make sure the delegate doesn't get garbage collected while + // native callbacks are in the completion queue. + readonly ServerShutdownCallbackDelegate serverShutdownHandler; + readonly CompletionCallbackDelegate newServerRpcHandler; + + readonly BlockingCollection newRpcQueue = new BlockingCollection(); + readonly ServerSafeHandle handle; + + readonly Dictionary callHandlers = new Dictionary(); + + readonly TaskCompletionSource shutdownTcs = new TaskCompletionSource(); + + public Server() + { + this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); + this.newServerRpcHandler = HandleNewServerRpc; + this.serverShutdownHandler = HandleServerShutdown; + } + + // only call this before Start() + public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) { + foreach(var entry in serviceDefinition.CallHandlers) + { + callHandlers.Add(entry.Key, entry.Value); + } + } + + // only call before Start() + public int AddPort(string addr) { + return handle.AddPort(addr); + } + + public void Start() + { + handle.Start(); + + // TODO: this basically means the server is single threaded.... + StartHandlingRpcs(); + } + + /// + /// Requests and handles single RPC call. + /// + internal void RunRpc() + { + AllowOneRpc(); + + try + { + var rpcInfo = newRpcQueue.Take(); + + //Console.WriteLine("Server received RPC " + rpcInfo.Method); + + IServerCallHandler callHandler; + if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) + { + callHandler = new NoSuchMethodCallHandler(); + } + callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue()); + } + catch(Exception e) + { + Console.WriteLine("Exception while handling RPC: " + e); + } + } + + /// + /// Requests server shutdown and when there are no more calls being serviced, + /// cleans up used resources. + /// + /// The async. + public async Task ShutdownAsync() { + handle.ShutdownAndNotify(serverShutdownHandler); + await shutdownTcs.Task; + handle.Dispose(); + } + + public void Kill() { + handle.Dispose(); + } + + private async Task StartHandlingRpcs() { + while (true) + { + await Task.Factory.StartNew(RunRpc); + } + } + + private void AllowOneRpc() + { + AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); + } + + private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) { + // TODO: handle error + } + + var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod()); + + // after server shutdown, the callback returns with null call + if (!rpcInfo.Call.IsInvalid) { + newRpcQueue.Add(rpcInfo); + } + + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleServerShutdown(IntPtr eventPtr) + { + try + { + shutdownTcs.SetResult(null); + } + catch (Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private static void AssertCallOk(GRPCCallError callError) + { + Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + } + + private static CompletionQueueSafeHandle GetCompletionQueue() + { + return GrpcEnvironment.ThreadPool.CompletionQueue; + } + + private struct NewRpcInfo + { + private CallSafeHandle call; + private string method; + + public NewRpcInfo(CallSafeHandle call, string method) + { + this.call = call; + this.method = method; + } + + public CallSafeHandle Call + { + get + { + return this.call; + } + } + + public string Method + { + get + { + return this.method; + } + } + } + } +} diff --git a/src/csharp/Grpc.Core/ServerCallHandler.cs b/src/csharp/Grpc.Core/ServerCallHandler.cs new file mode 100644 index 0000000000..1296947f34 --- /dev/null +++ b/src/csharp/Grpc.Core/ServerCallHandler.cs @@ -0,0 +1,136 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + internal interface IServerCallHandler + { + void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq); + } + + internal class UnaryRequestServerCallHandler : IServerCallHandler + { + readonly Method method; + readonly UnaryRequestServerMethod handler; + + public UnaryRequestServerCallHandler(Method method, UnaryRequestServerMethod handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall( + method.ResponseMarshaller.Serializer, + method.RequestMarshaller.Deserializer); + + asyncCall.InitializeServer(call); + + var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); + + var request = asyncCall.ReceiveMessageAsync().Result; + + var responseObserver = new ServerStreamingOutputObserver(asyncCall); + handler(request, responseObserver); + + finishedTask.Wait(); + + } + } + + internal class StreamingRequestServerCallHandler : IServerCallHandler + { + readonly Method method; + readonly StreamingRequestServerMethod handler; + + public StreamingRequestServerCallHandler(Method method, StreamingRequestServerMethod handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall( + method.ResponseMarshaller.Serializer, + method.RequestMarshaller.Deserializer); + + asyncCall.InitializeServer(call); + + var responseObserver = new ServerStreamingOutputObserver(asyncCall); + var requestObserver = handler(responseObserver); + var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver); + finishedTask.Wait(); + } + } + + internal class NoSuchMethodCallHandler : IServerCallHandler + { + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + // We don't care about the payload type here. + AsyncCall asyncCall = new AsyncCall( + (payload) => payload, (payload) => payload); + + + asyncCall.InitializeServer(call); + + var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver()); + + asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait(); + + finishedTask.Wait(); + } + } + + internal class NullObserver : IObserver + { + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(T value) + { + } + + } +} + diff --git a/src/csharp/Grpc.Core/ServerCalls.cs b/src/csharp/Grpc.Core/ServerCalls.cs new file mode 100644 index 0000000000..bed77796de --- /dev/null +++ b/src/csharp/Grpc.Core/ServerCalls.cs @@ -0,0 +1,58 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + // TODO: perhaps add also serverSideStreaming and clientSideStreaming + + public delegate void UnaryRequestServerMethod (TRequest request, IObserver responseObserver); + + public delegate IObserver StreamingRequestServerMethod (IObserver responseObserver); + + internal static class ServerCalls { + + public static IServerCallHandler UnaryRequestCall(Method method, UnaryRequestServerMethod handler) + { + return new UnaryRequestServerCallHandler(method, handler); + } + + public static IServerCallHandler StreamingRequestCall(Method method, StreamingRequestServerMethod handler) + { + return new StreamingRequestServerCallHandler(method, handler); + } + + } +} + diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs new file mode 100644 index 0000000000..231c376062 --- /dev/null +++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs @@ -0,0 +1,98 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; + +namespace Grpc.Core +{ + public class ServerServiceDefinition + { + readonly string serviceName; + // TODO: we would need an immutable dictionary here... + readonly Dictionary callHandlers; + + private ServerServiceDefinition(string serviceName, Dictionary callHandlers) + { + this.serviceName = serviceName; + this.callHandlers = new Dictionary(callHandlers); + } + + internal Dictionary CallHandlers + { + get + { + return this.callHandlers; + } + } + + + public static Builder CreateBuilder(String serviceName) + { + return new Builder(serviceName); + } + + public class Builder + { + readonly string serviceName; + readonly Dictionary callHandlers = new Dictionary(); + + public Builder(string serviceName) + { + this.serviceName = serviceName; + } + + public Builder AddMethod( + Method method, + UnaryRequestServerMethod handler) + { + callHandlers.Add(method.Name, ServerCalls.UnaryRequestCall(method, handler)); + return this; + } + + public Builder AddMethod( + Method method, + StreamingRequestServerMethod handler) + { + callHandlers.Add(method.Name, ServerCalls.StreamingRequestCall(method, handler)); + return this; + } + + public ServerServiceDefinition Build() + { + return new ServerServiceDefinition(serviceName, callHandlers); + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs new file mode 100644 index 0000000000..5ea1df7b48 --- /dev/null +++ b/src/csharp/Grpc.Core/Status.cs @@ -0,0 +1,69 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; + +namespace Grpc.Core +{ + /// + /// Represents RPC result. + /// + public struct Status + { + readonly StatusCode statusCode; + readonly string detail; + + public Status(StatusCode statusCode, string detail) + { + this.statusCode = statusCode; + this.detail = detail; + } + + public StatusCode StatusCode + { + get + { + return statusCode; + } + } + + public string Detail + { + get + { + return detail; + } + } + } +} diff --git a/src/csharp/Grpc.Core/StatusCode.cs b/src/csharp/Grpc.Core/StatusCode.cs new file mode 100644 index 0000000000..1fbf9c1b68 --- /dev/null +++ b/src/csharp/Grpc.Core/StatusCode.cs @@ -0,0 +1,181 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + // TODO: element names should changed to comply with C# naming conventions. + /// + /// based on grpc_status_code from grpc/status.h + /// + public enum StatusCode + { + /* Not an error; returned on success + + HTTP Mapping: 200 OK */ + OK = 0, + /* The operation was cancelled (typically by the caller). + + HTTP Mapping: 499 Client Closed Request */ + Cancelled = 1, + /* Unknown error. An example of where this error may be returned is + if a Status value received from another address space belongs to + an error-space that is not known in this address space. Also + errors raised by APIs that do not return enough error information + may be converted to this error. + + HTTP Mapping: 500 Internal Server Error */ + Unknown = 2, + /* Client specified an invalid argument. Note that this differs + from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments + that are problematic regardless of the state of the system + (e.g., a malformed file name). + + HTTP Mapping: 400 Bad Request */ + InvalidArgument = 3, + /* Deadline expired before operation could complete. For operations + that change the state of the system, this error may be returned + even if the operation has completed successfully. For example, a + successful response from a server could have been delayed long + enough for the deadline to expire. + + HTTP Mapping: 504 Gateway Timeout */ + DeadlineExceeded = 4, + /* Some requested entity (e.g., file or directory) was not found. + + HTTP Mapping: 404 Not Found */ + NotFound = 5, + /* Some entity that we attempted to create (e.g., file or directory) + already exists. + + HTTP Mapping: 409 Conflict */ + AlreadyExists = 6, + /* The caller does not have permission to execute the specified + operation. PERMISSION_DENIED must not be used for rejections + caused by exhausting some resource (use RESOURCE_EXHAUSTED + instead for those errors). PERMISSION_DENIED must not be + used if the caller can not be identified (use UNAUTHENTICATED + instead for those errors). + + HTTP Mapping: 403 Forbidden */ + PermissionDenied = 7, + /* The request does not have valid authentication credentials for the + operation. + + HTTP Mapping: 401 Unauthorized */ + Unauthenticated = 16, + /* Some resource has been exhausted, perhaps a per-user quota, or + perhaps the entire file system is out of space. + + HTTP Mapping: 429 Too Many Requests */ + ResourceExhausted = 8, + /* Operation was rejected because the system is not in a state + required for the operation's execution. For example, directory + to be deleted may be non-empty, an rmdir operation is applied to + a non-directory, etc. + + A litmus test that may help a service implementor in deciding + between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: + (a) Use UNAVAILABLE if the client can retry just the failing call. + (b) Use ABORTED if the client should retry at a higher-level + (e.g., restarting a read-modify-write sequence). + (c) Use FAILED_PRECONDITION if the client should not retry until + the system state has been explicitly fixed. E.g., if an "rmdir" + fails because the directory is non-empty, FAILED_PRECONDITION + should be returned since the client should not retry unless + they have first fixed up the directory by deleting files from it. + (d) Use FAILED_PRECONDITION if the client performs conditional + REST Get/Update/Delete on a resource and the resource on the + server does not match the condition. E.g., conflicting + read-modify-write on the same resource. + + HTTP Mapping: 400 Bad Request + + NOTE: HTTP spec says 412 Precondition Failed should only be used if + the request contains Etag related headers. So if the server does see + Etag related headers in the request, it may choose to return 412 + instead of 400 for this error code. */ + FailedPrecondition = 9, + /* The operation was aborted, typically due to a concurrency issue + like sequencer check failures, transaction aborts, etc. + + See litmus test above for deciding between FAILED_PRECONDITION, + ABORTED, and UNAVAILABLE. + + HTTP Mapping: 409 Conflict */ + Aborted = 10, + /* Operation was attempted past the valid range. E.g., seeking or + reading past end of file. + + Unlike INVALID_ARGUMENT, this error indicates a problem that may + be fixed if the system state changes. For example, a 32-bit file + system will generate INVALID_ARGUMENT if asked to read at an + offset that is not in the range [0,2^32-1], but it will generate + OUT_OF_RANGE if asked to read from an offset past the current + file size. + + There is a fair bit of overlap between FAILED_PRECONDITION and + OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific + error) when it applies so that callers who are iterating through + a space can easily look for an OUT_OF_RANGE error to detect when + they are done. + + HTTP Mapping: 400 Bad Request */ + OutOfRange = 11, + /* Operation is not implemented or not supported/enabled in this service. + + HTTP Mapping: 501 Not Implemented */ + Unimplemented = 12, + /* Internal errors. Means some invariants expected by underlying + system has been broken. If you see one of these errors, + something is very broken. + + HTTP Mapping: 500 Internal Server Error */ + Internal = 13, + /* The service is currently unavailable. This is a most likely a + transient condition and may be corrected by retrying with + a backoff. + + See litmus test above for deciding between FAILED_PRECONDITION, + ABORTED, and UNAVAILABLE. + + HTTP Mapping: 503 Service Unavailable */ + Unavailable = 14, + /* Unrecoverable data loss or corruption. + + HTTP Mapping: 500 Internal Server Error */ + DataLoss = 15 + } +} + diff --git a/src/csharp/Grpc.Core/Utils/RecordingObserver.cs b/src/csharp/Grpc.Core/Utils/RecordingObserver.cs new file mode 100644 index 0000000000..99d2725b70 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/RecordingObserver.cs @@ -0,0 +1,65 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Grpc.Core.Utils +{ + public class RecordingObserver : IObserver + { + TaskCompletionSource> tcs = new TaskCompletionSource>(); + List data = new List(); + + public void OnCompleted() + { + tcs.SetResult(data); + } + + public void OnError(Exception error) + { + tcs.SetException(error); + } + + public void OnNext(T value) + { + data.Add(value); + } + + public Task> ToList() { + return tcs.Task; + } + } +} + diff --git a/src/csharp/Grpc.Core/Utils/RecordingQueue.cs b/src/csharp/Grpc.Core/Utils/RecordingQueue.cs new file mode 100644 index 0000000000..63992da6a9 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/RecordingQueue.cs @@ -0,0 +1,84 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Collections.Concurrent; + +namespace Grpc.Core.Utils +{ + // TODO: replace this by something that implements IAsyncEnumerator. + /// + /// Observer that allows us to await incoming messages one-by-one. + /// The implementation is not ideal and class will be probably replaced + /// by something more versatile in the future. + /// + public class RecordingQueue : IObserver + { + readonly BlockingCollection queue = new BlockingCollection(); + TaskCompletionSource tcs = new TaskCompletionSource(); + + public void OnCompleted() + { + tcs.SetResult(null); + } + + public void OnError(Exception error) + { + tcs.SetException(error); + } + + public void OnNext(T value) + { + queue.Add(value); + } + + public BlockingCollection Queue + { + get + { + return queue; + } + } + + public Task Finished + { + get + { + return tcs.Task; + } + } + } +} + -- cgit v1.2.3