diff options
Diffstat (limited to 'src/csharp/Grpc.Core')
32 files changed, 3478 insertions, 0 deletions
diff --git a/src/csharp/Grpc.Core/.gitignore b/src/csharp/Grpc.Core/.gitignore new file mode 100644 index 0000000000..8d4a6c08a8 --- /dev/null +++ b/src/csharp/Grpc.Core/.gitignore @@ -0,0 +1,2 @@ +bin +obj
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs new file mode 100644 index 0000000000..72dca68895 --- /dev/null +++ b/src/csharp/Grpc.Core/Call.cs @@ -0,0 +1,98 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + public class Call<TRequest, TResponse> + { + readonly string methodName; + readonly Func<TRequest, byte[]> requestSerializer; + readonly Func<byte[], TResponse> responseDeserializer; + readonly Channel channel; + + public Call(string methodName, + Func<TRequest, byte[]> requestSerializer, + Func<byte[], TResponse> responseDeserializer, + TimeSpan timeout, + Channel channel) { + this.methodName = methodName; + this.requestSerializer = requestSerializer; + this.responseDeserializer = responseDeserializer; + this.channel = channel; + } + + public Call(Method<TRequest, TResponse> method, Channel channel) + { + this.methodName = method.Name; + this.requestSerializer = method.RequestMarshaller.Serializer; + this.responseDeserializer = method.ResponseMarshaller.Deserializer; + this.channel = channel; + } + + public Channel Channel + { + get + { + return this.channel; + } + } + + public string MethodName + { + get + { + return this.methodName; + } + } + + public Func<TRequest, byte[]> RequestSerializer + { + get + { + return this.requestSerializer; + } + } + + public Func<byte[], TResponse> ResponseDeserializer + { + get + { + return this.responseDeserializer; + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs new file mode 100644 index 0000000000..b67332676a --- /dev/null +++ b/src/csharp/Grpc.Core/Calls.cs @@ -0,0 +1,108 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + // NOTE: this class is work-in-progress + + /// <summary> + /// Helper methods for generated stubs to make RPC calls. + /// </summary> + public static class Calls + { + public static TResponse BlockingUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) + { + //TODO: implement this in real synchronous style. + try { + return AsyncUnaryCall(call, req, token).Result; + } catch(AggregateException ae) { + foreach (var e in ae.InnerExceptions) + { + if (e is RpcException) + { + throw e; + } + } + throw; + } + } + + public static async Task<TResponse> AsyncUnaryCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, CancellationToken token) + { + var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + return await asyncCall.UnaryCallAsync(req); + } + + public static void AsyncServerStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, TRequest req, IObserver<TResponse> outputs, CancellationToken token) + { + var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); + + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + asyncCall.StartServerStreamingCall(req, outputs); + } + + public static ClientStreamingAsyncResult<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, CancellationToken token) + { + var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + var task = asyncCall.ClientStreamingCallAsync(); + var inputs = new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); + return new ClientStreamingAsyncResult<TRequest, TResponse>(task, inputs); + } + + public static TResponse BlockingClientStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObservable<TRequest> inputs, CancellationToken token) + { + throw new NotImplementedException(); + } + + public static IObserver<TRequest> DuplexStreamingCall<TRequest, TResponse>(Call<TRequest, TResponse> call, IObserver<TResponse> outputs, CancellationToken token) + { + var asyncCall = new AsyncCall<TRequest, TResponse>(call.RequestSerializer, call.ResponseDeserializer); + asyncCall.Initialize(call.Channel, GetCompletionQueue(), call.MethodName); + + asyncCall.StartDuplexStreamingCall(outputs); + return new ClientStreamingInputObserver<TRequest, TResponse>(asyncCall); + } + + private static CompletionQueueSafeHandle GetCompletionQueue() { + return GrpcEnvironment.ThreadPool.CompletionQueue; + } + } +} + diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs new file mode 100644 index 0000000000..942651cf39 --- /dev/null +++ b/src/csharp/Grpc.Core/Channel.cs @@ -0,0 +1,85 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + public class Channel : IDisposable + { + readonly ChannelSafeHandle handle; + readonly String target; + + // TODO: add way how to create grpc_secure_channel.... + // TODO: add support for channel args... + public Channel(string target) + { + this.handle = ChannelSafeHandle.Create(target, IntPtr.Zero); + this.target = target; + } + + internal ChannelSafeHandle Handle + { + get + { + return this.handle; + } + } + + public string Target + { + get + { + return this.target; + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (handle != null && !handle.IsInvalid) + { + handle.Dispose(); + } + } + } +} diff --git a/src/csharp/Grpc.Core/ClientStreamingAsyncResult.cs b/src/csharp/Grpc.Core/ClientStreamingAsyncResult.cs new file mode 100644 index 0000000000..44580a1154 --- /dev/null +++ b/src/csharp/Grpc.Core/ClientStreamingAsyncResult.cs @@ -0,0 +1,70 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; + +namespace Grpc.Core +{ + /// <summary> + /// Return type for client streaming async method. + /// </summary> + public struct ClientStreamingAsyncResult<TRequest, TResponse> + { + readonly Task<TResponse> task; + readonly IObserver<TRequest> inputs; + + public ClientStreamingAsyncResult(Task<TResponse> task, IObserver<TRequest> inputs) + { + this.task = task; + this.inputs = inputs; + } + + public Task<TResponse> Task + { + get + { + return this.task; + } + } + + public IObserver<TRequest> Inputs + { + get + { + return this.inputs; + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj new file mode 100644 index 0000000000..4ad32e10e4 --- /dev/null +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProductVersion>10.0.0</ProductVersion> + <SchemaVersion>2.0</SchemaVersion> + <ProjectGuid>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</ProjectGuid> + <OutputType>Library</OutputType> + <RootNamespace>Grpc.Core</RootNamespace> + <AssemblyName>Grpc.Core</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>bin\Debug</OutputPath> + <DefineConstants>DEBUG;</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + <ConsolePause>false</ConsolePause> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <DebugType>full</DebugType> + <Optimize>true</Optimize> + <OutputPath>bin\Release</OutputPath> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + <ConsolePause>false</ConsolePause> + </PropertyGroup> + <ItemGroup> + <Reference Include="System" /> + </ItemGroup> + <ItemGroup> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="RpcException.cs" /> + <Compile Include="Calls.cs" /> + <Compile Include="Call.cs" /> + <Compile Include="ClientStreamingAsyncResult.cs" /> + <Compile Include="GrpcEnvironment.cs" /> + <Compile Include="Status.cs" /> + <Compile Include="StatusCode.cs" /> + <Compile Include="Server.cs" /> + <Compile Include="Channel.cs" /> + <Compile Include="Internal\CallSafeHandle.cs" /> + <Compile Include="Internal\ChannelSafeHandle.cs" /> + <Compile Include="Internal\CompletionQueueSafeHandle.cs" /> + <Compile Include="Internal\Enums.cs" /> + <Compile Include="Internal\SafeHandleZeroIsInvalid.cs" /> + <Compile Include="Internal\Timespec.cs" /> + <Compile Include="Internal\GrpcThreadPool.cs" /> + <Compile Include="Internal\AsyncCall.cs" /> + <Compile Include="Internal\ServerSafeHandle.cs" /> + <Compile Include="Method.cs" /> + <Compile Include="ServerCalls.cs" /> + <Compile Include="ServerCallHandler.cs" /> + <Compile Include="Marshaller.cs" /> + <Compile Include="ServerServiceDefinition.cs" /> + <Compile Include="Utils\RecordingObserver.cs" /> + <Compile Include="Utils\RecordingQueue.cs" /> + <Compile Include="Internal\ClientStreamingInputObserver.cs" /> + <Compile Include="Internal\ServerStreamingOutputObserver.cs" /> + <Compile Include="Internal\BatchContextSafeHandleNotOwned.cs" /> + </ItemGroup> + <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> + <ItemGroup> + <Folder Include="Internal\" /> + <Folder Include="Utils\" /> + </ItemGroup> +</Project>
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec new file mode 100644 index 0000000000..af8a8869ca --- /dev/null +++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec @@ -0,0 +1,23 @@ +<?xml version="1.0" encoding="utf-8"?> +<package > + <metadata> + <id>Grpc.Core</id> + <title>gRPC Core</title> + <summary>Core C# implementation of gRPC - an RPC library and framework</summary> + <description>Core C# implementation of gRPC - an RPC library and framework. See project site for more info. + This is an experimental release, not ready to use. + </description> + <version>0.1.0</version> + <authors>Google Inc.</authors> + <owners>jtattermusch</owners> + <licenseUrl>https://github.com/grpc/grpc/blob/master/LICENSE</licenseUrl> + <projectUrl>https://github.com/grpc/grpc</projectUrl> + <requireLicenseAcceptance>false</requireLicenseAcceptance> + <releaseNotes>The first experimental release. Not ready to use.</releaseNotes> + <copyright>Copyright 2015, Google Inc.</copyright> + <tags>gRPC RPC Protocol HTTP/2</tags> + </metadata> + <files> + <file src="bin/Release/Grpc.Core.dll" target="lib/net45" /> + </files> +</package> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs new file mode 100644 index 0000000000..0e3a0a581c --- /dev/null +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -0,0 +1,135 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + /// <summary> + /// Encapsulates initialization and shutdown of gRPC library. + /// </summary> + public class GrpcEnvironment + { + const int THREAD_POOL_SIZE = 4; + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_init(); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_shutdown(); + + static object staticLock = new object(); + static volatile GrpcEnvironment instance; + + readonly GrpcThreadPool threadPool; + bool isClosed; + + /// <summary> + /// Makes sure GRPC environment is initialized. Subsequent invocations don't have any + /// effect unless you call Shutdown first. + /// Although normal use cases assume you will call this just once in your application's + /// lifetime (and call Shutdown once you're done), for the sake of easier testing it's + /// allowed to initialize the environment again after it has been successfully shutdown. + /// </summary> + public static void Initialize() { + lock(staticLock) + { + if (instance == null) + { + instance = new GrpcEnvironment(); + } + } + } + + /// <summary> + /// Shuts down the GRPC environment if it was initialized before. + /// Repeated invocations have no effect. + /// </summary> + public static void Shutdown() + { + lock(staticLock) + { + if (instance != null) + { + instance.Close(); + instance = null; + } + } + } + + internal static GrpcThreadPool ThreadPool + { + get + { + var inst = instance; + if (inst == null) + { + throw new InvalidOperationException("GRPC environment not initialized"); + } + return inst.threadPool; + } + } + + /// <summary> + /// Creates gRPC environment. + /// </summary> + private GrpcEnvironment() + { + grpcsharp_init(); + threadPool = new GrpcThreadPool(THREAD_POOL_SIZE); + threadPool.Start(); + // TODO: use proper logging here + Console.WriteLine("GRPC initialized."); + } + + /// <summary> + /// Shuts down this environment. + /// </summary> + private void Close() + { + if (isClosed) + { + throw new InvalidOperationException("Close has already been called"); + } + threadPool.Stop(); + grpcsharp_shutdown(); + isClosed = true; + + // TODO: use proper logging here + Console.WriteLine("GRPC shutdown."); + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs new file mode 100644 index 0000000000..5e96092e27 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -0,0 +1,616 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Handles native call lifecycle and provides convenience methods. + /// </summary> + internal class AsyncCall<TWrite, TRead> + { + readonly Func<TWrite, byte[]> serializer; + readonly Func<byte[], TRead> deserializer; + + readonly CompletionCallbackDelegate unaryResponseHandler; + readonly CompletionCallbackDelegate finishedHandler; + readonly CompletionCallbackDelegate writeFinishedHandler; + readonly CompletionCallbackDelegate readFinishedHandler; + readonly CompletionCallbackDelegate halfclosedHandler; + readonly CompletionCallbackDelegate finishedServersideHandler; + + object myLock = new object(); + GCHandle gchandle; + CallSafeHandle call; + bool disposed; + + bool server; + + bool started; + bool errorOccured; + bool cancelRequested; + bool readingDone; + bool halfcloseRequested; + bool halfclosed; + bool finished; + + // Completion of a pending write if not null. + TaskCompletionSource<object> writeTcs; + + // Completion of a pending read if not null. + TaskCompletionSource<TRead> readTcs; + + // Completion of a pending halfclose if not null. + TaskCompletionSource<object> halfcloseTcs; + + // Completion of a pending unary response if not null. + TaskCompletionSource<TRead> unaryResponseTcs; + + // Set after status is received on client. Only used for server streaming and duplex streaming calls. + Nullable<Status> finishedStatus; + TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); + + // For streaming, the reads will be delivered to this observer. + IObserver<TRead> readObserver; + + public AsyncCall(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) + { + this.serializer = serializer; + this.deserializer = deserializer; + this.unaryResponseHandler = HandleUnaryResponse; + this.finishedHandler = HandleFinished; + this.writeFinishedHandler = HandleWriteFinished; + this.readFinishedHandler = HandleReadFinished; + this.halfclosedHandler = HandleHalfclosed; + this.finishedServersideHandler = HandleFinishedServerside; + } + + public void Initialize(Channel channel, CompletionQueueSafeHandle cq, String methodName) + { + InitializeInternal(CallSafeHandle.Create(channel.Handle, cq, methodName, channel.Target, Timespec.InfFuture), false); + } + + public void InitializeServer(CallSafeHandle call) + { + InitializeInternal(call, true); + } + + public Task<TRead> UnaryCallAsync(TWrite msg) + { + lock (myLock) + { + started = true; + halfcloseRequested = true; + readingDone = true; + + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + unaryResponseTcs = new TaskCompletionSource<TRead>(); + call.StartUnary(payload, unaryResponseHandler); + + return unaryResponseTcs.Task; + } + } + + public Task<TRead> ClientStreamingCallAsync() + { + lock (myLock) + { + started = true; + readingDone = true; + + unaryResponseTcs = new TaskCompletionSource<TRead>(); + call.StartClientStreaming(unaryResponseHandler); + + return unaryResponseTcs.Task; + } + } + + public void StartServerStreamingCall(TWrite msg, IObserver<TRead> readObserver) + { + lock (myLock) + { + started = true; + halfcloseRequested = true; + + this.readObserver = readObserver; + + // TODO: handle serialization error... + byte[] payload = serializer(msg); + + call.StartServerStreaming(payload, finishedHandler); + + ReceiveMessageAsync(); + } + } + + public void StartDuplexStreamingCall(IObserver<TRead> readObserver) + { + lock (myLock) + { + started = true; + + this.readObserver = readObserver; + + call.StartDuplexStreaming(finishedHandler); + + ReceiveMessageAsync(); + } + } + + public Task ServerSideUnaryRequestCallAsync() + { + lock (myLock) + { + started = true; + call.StartServerSide(finishedServersideHandler); + return finishedServersideTcs.Task; + } + } + + public Task ServerSideStreamingRequestCallAsync(IObserver<TRead> readObserver) + { + lock (myLock) + { + started = true; + call.StartServerSide(finishedServersideHandler); + + if (this.readObserver != null) + { + throw new InvalidOperationException("Already registered an observer."); + } + this.readObserver = readObserver; + ReceiveMessageAsync(); + + return finishedServersideTcs.Task; + } + } + + public Task SendMessageAsync(TWrite msg) + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + CheckNoError(); + + if (halfcloseRequested) + { + throw new InvalidOperationException("Already halfclosed."); + } + + if (writeTcs != null) + { + throw new InvalidOperationException("Only one write can be pending at a time"); + } + + // TODO: wrap serialization... + byte[] payload = serializer(msg); + + call.StartSendMessage(payload, writeFinishedHandler); + writeTcs = new TaskCompletionSource<object>(); + return writeTcs.Task; + } + } + + public Task SendCloseFromClientAsync() + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + CheckNoError(); + + if (halfcloseRequested) + { + throw new InvalidOperationException("Already halfclosed."); + } + + call.StartSendCloseFromClient(halfclosedHandler); + + halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource<object>(); + return halfcloseTcs.Task; + } + } + + public Task SendStatusFromServerAsync(Status status) + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + CheckNoError(); + + if (halfcloseRequested) + { + throw new InvalidOperationException("Already halfclosed."); + } + + call.StartSendStatusFromServer(status, halfclosedHandler); + halfcloseRequested = true; + halfcloseTcs = new TaskCompletionSource<object>(); + return halfcloseTcs.Task; + } + } + + public Task<TRead> ReceiveMessageAsync() + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + CheckNoError(); + + if (readingDone) + { + throw new InvalidOperationException("Already read the last message."); + } + + if (readTcs != null) + { + throw new InvalidOperationException("Only one read can be pending at a time"); + } + + call.StartReceiveMessage(readFinishedHandler); + + readTcs = new TaskCompletionSource<TRead>(); + return readTcs.Task; + } + } + + public void Cancel() + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + cancelRequested = true; + } + // grpc_call_cancel is threadsafe + call.Cancel(); + } + + public void CancelWithStatus(Status status) + { + lock (myLock) + { + CheckNotDisposed(); + CheckStarted(); + cancelRequested = true; + } + // grpc_call_cancel_with_status is threadsafe + call.CancelWithStatus(status); + } + + private void InitializeInternal(CallSafeHandle call, bool server) + { + lock (myLock) + { + // Make sure this object and the delegated held by it will not be garbage collected + // before we release this handle. + gchandle = GCHandle.Alloc(this); + this.call = call; + this.server = server; + } + } + + private void CheckStarted() + { + if (!started) + { + throw new InvalidOperationException("Call not started"); + } + } + + private void CheckNotDisposed() + { + if (disposed) + { + throw new InvalidOperationException("Call has already been disposed."); + } + } + + private void CheckNoError() + { + if (errorOccured) + { + throw new InvalidOperationException("Error occured when processing call."); + } + } + + private bool ReleaseResourcesIfPossible() + { + if (!disposed && call != null) + { + if (halfclosed && readingDone && finished) + { + ReleaseResources(); + return true; + } + } + return false; + } + + private void ReleaseResources() + { + if (call != null) { + call.Dispose(); + } + gchandle.Free(); + disposed = true; + } + + private void CompleteStreamObserver(Status status) + { + if (status.StatusCode != StatusCode.OK) + { + // TODO: wrap to handle exceptions; + readObserver.OnError(new RpcException(status)); + } else { + // TODO: wrap to handle exceptions; + readObserver.OnCompleted(); + } + } + + /// <summary> + /// Handler for unary response completion. + /// </summary> + private void HandleUnaryResponse(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + TaskCompletionSource<TRead> tcs; + lock(myLock) + { + finished = true; + halfclosed = true; + tcs = unaryResponseTcs; + + ReleaseResourcesIfPossible(); + } + + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) + { + tcs.SetException(new RpcException( + new Status(StatusCode.Internal, "Internal error occured.") + )); + return; + } + + var status = ctx.GetReceivedStatus(); + if (status.StatusCode != StatusCode.OK) + { + tcs.SetException(new RpcException(status)); + return; + } + + // TODO: handle deserialize error... + var msg = deserializer(ctx.GetReceivedMessage()); + tcs.SetResult(msg); + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleWriteFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + TaskCompletionSource<object> oldTcs = null; + lock (myLock) + { + oldTcs = writeTcs; + writeTcs = null; + } + + if (errorOccured) + { + // TODO: use the right type of exception... + oldTcs.SetException(new Exception("Write failed")); + } + else + { + // TODO: where does the continuation run? + oldTcs.SetResult(null); + } + + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleHalfclosed(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + lock (myLock) + { + halfclosed = true; + + ReleaseResourcesIfPossible(); + } + + if (error != GRPCOpError.GRPC_OP_OK) + { + halfcloseTcs.SetException(new Exception("Halfclose failed")); + + } + else + { + halfcloseTcs.SetResult(null); + } + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleReadFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var payload = ctx.GetReceivedMessage(); + + TaskCompletionSource<TRead> oldTcs = null; + IObserver<TRead> observer = null; + + Nullable<Status> status = null; + + lock (myLock) + { + oldTcs = readTcs; + readTcs = null; + if (payload == null) + { + readingDone = true; + } + observer = readObserver; + status = finishedStatus; + } + + // TODO: wrap deserialization... + TRead msg = payload != null ? deserializer(payload) : default(TRead); + + oldTcs.SetResult(msg); + + // TODO: make sure we deliver reads in the right order. + + if (observer != null) + { + if (payload != null) + { + // TODO: wrap to handle exceptions + observer.OnNext(msg); + + // start a new read + ReceiveMessageAsync(); + } + else + { + if (!server) + { + if (status.HasValue) + { + CompleteStreamObserver(status.Value); + } + } + else + { + // TODO: wrap to handle exceptions.. + observer.OnCompleted(); + } + // TODO: completeStreamObserver serverside... + } + } + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleFinished(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + var status = ctx.GetReceivedStatus(); + + bool wasReadingDone; + + lock (myLock) + { + finished = true; + finishedStatus = status; + + wasReadingDone = readingDone; + + ReleaseResourcesIfPossible(); + } + + if (wasReadingDone) { + CompleteStreamObserver(status); + } + + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleFinishedServerside(GRPCOpError error, IntPtr batchContextPtr) + { + try + { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + lock(myLock) + { + finished = true; + + // TODO: because of the way server calls are implemented, we need to set + // reading done to true here. Should be fixed in the future. + readingDone = true; + + ReleaseResourcesIfPossible(); + } + // TODO: handle error ... + + finishedServersideTcs.SetResult(null); + + } + catch(Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + } +}
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs new file mode 100644 index 0000000000..75cd30e1a2 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandleNotOwned.cs @@ -0,0 +1,96 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using Grpc.Core; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Not owned version of + /// grpcsharp_batch_context + /// </summary> + internal class BatchContextSafeHandleNotOwned : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandleNotOwned ctx, byte[] buffer, UIntPtr bufferLen); + + [DllImport("grpc_csharp_ext.dll")] + static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandleNotOwned ctx); // returns const char* + + [DllImport("grpc_csharp_ext.dll")] + static extern CallSafeHandle grpcsharp_batch_context_server_rpc_new_call(BatchContextSafeHandleNotOwned ctx); + + [DllImport("grpc_csharp_ext.dll")] + static extern IntPtr grpcsharp_batch_context_server_rpc_new_method(BatchContextSafeHandleNotOwned ctx); // returns const char* + + public BatchContextSafeHandleNotOwned(IntPtr handle) : base(false) + { + SetHandle(handle); + } + + public Status GetReceivedStatus() + { + // TODO: can the native method return string directly? + string details = Marshal.PtrToStringAnsi(grpcsharp_batch_context_recv_status_on_client_details(this)); + return new Status(grpcsharp_batch_context_recv_status_on_client_status(this), details); + } + + public byte[] GetReceivedMessage() + { + IntPtr len = grpcsharp_batch_context_recv_message_length(this); + if (len == new IntPtr(-1)) + { + return null; + } + byte[] data = new byte[(int) len]; + grpcsharp_batch_context_recv_message_to_buffer(this, data, new UIntPtr((ulong)data.Length)); + return data; + } + + public CallSafeHandle GetServerRpcNewCall() { + return grpcsharp_batch_context_server_rpc_new_call(this); + } + + public string GetServerRpcNewMethod() { + return Marshal.PtrToStringAnsi(grpcsharp_batch_context_server_rpc_new_method(this)); + } + } +}
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs new file mode 100644 index 0000000000..659a383b4b --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -0,0 +1,181 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.InteropServices; +using Grpc.Core; + +namespace Grpc.Core.Internal +{ + //TODO: rename the delegate + internal delegate void CompletionCallbackDelegate(GRPCOpError error, IntPtr batchContextPtr); + + /// <summary> + /// grpc_call from <grpc/grpc.h> + /// </summary> + internal class CallSafeHandle : SafeHandleZeroIsInvalid + { + const UInt32 GRPC_WRITE_BUFFER_HINT = 1; + + [DllImport("grpc_csharp_ext.dll")] + static extern CallSafeHandle grpcsharp_channel_create_call(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_cancel(CallSafeHandle call); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_cancel_with_status(CallSafeHandle call, StatusCode status, string description); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_duplex_streaming(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, + byte[] send_buffer, UIntPtr send_buffer_len); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback, StatusCode statusCode, string statusMessage); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, + [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_call_destroy(IntPtr call); + + + private CallSafeHandle() + { + } + + public static CallSafeHandle Create(ChannelSafeHandle channel, CompletionQueueSafeHandle cq, string method, string host, Timespec deadline) + { + return grpcsharp_channel_create_call(channel, cq, method, host, deadline); + } + + public void StartUnary(byte[] payload, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_unary(this, callback, payload, new UIntPtr((ulong) payload.Length))); + } + + public void StartClientStreaming(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_client_streaming(this, callback)); + } + + public void StartServerStreaming(byte[] payload, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_server_streaming(this, callback, payload, new UIntPtr((ulong) payload.Length))); + } + + public void StartDuplexStreaming(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_duplex_streaming(this, callback)); + } + + public void StartSendMessage(byte[] payload, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_send_message(this, callback, payload, new UIntPtr((ulong) payload.Length))); + } + + public void StartSendCloseFromClient(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_send_close_from_client(this, callback)); + } + + public void StartSendStatusFromServer(Status status, CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_send_status_from_server(this, callback, status.StatusCode, status.Detail)); + } + + public void StartReceiveMessage(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_recv_message(this, callback)); + } + + public void StartServerSide(CompletionCallbackDelegate callback) + { + AssertCallOk(grpcsharp_call_start_serverside(this, callback)); + } + + public void Cancel() + { + AssertCallOk(grpcsharp_call_cancel(this)); + } + + public void CancelWithStatus(Status status) + { + AssertCallOk(grpcsharp_call_cancel_with_status(this, status.StatusCode, status.Detail)); + } + + protected override bool ReleaseHandle() + { + grpcsharp_call_destroy(handle); + return true; + } + + private static void AssertCallOk(GRPCCallError callError) + { + Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + } + + private static UInt32 GetFlags(bool buffered) { + return buffered ? 0 : GRPC_WRITE_BUFFER_HINT; + } + } +}
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs new file mode 100644 index 0000000000..f15ead3572 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -0,0 +1,67 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// grpc_channel from <grpc/grpc.h> + /// </summary> + internal class ChannelSafeHandle : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern ChannelSafeHandle grpcsharp_channel_create(string target, IntPtr channelArgs); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_channel_destroy(IntPtr channel); + + private ChannelSafeHandle() + { + } + + public static ChannelSafeHandle Create(string target, IntPtr channelArgs) + { + return grpcsharp_channel_create(target, channelArgs); + } + + protected override bool ReleaseHandle() + { + grpcsharp_channel_destroy(handle); + return true; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs new file mode 100644 index 0000000000..fb59e86e2d --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ClientStreamingInputObserver.cs @@ -0,0 +1,67 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + internal class ClientStreamingInputObserver<TWrite, TRead> : IObserver<TWrite> + { + readonly AsyncCall<TWrite, TRead> call; + + public ClientStreamingInputObserver(AsyncCall<TWrite, TRead> call) + { + this.call = call; + } + + public void OnCompleted() + { + + // TODO: how bad is the Wait here? + call.SendCloseFromClientAsync().Wait(); + } + + public void OnError(Exception error) + { + throw new InvalidOperationException("This should never be called."); + } + + public void OnNext(TWrite value) + { + // TODO: how bad is the Wait here? + call.SendMessageAsync(value).Wait(); + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs new file mode 100644 index 0000000000..3f01fdbfd0 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/CompletionQueueSafeHandle.cs @@ -0,0 +1,83 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// grpc_completion_queue from <grpc/grpc.h> + /// </summary> + internal class CompletionQueueSafeHandle : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern CompletionQueueSafeHandle grpcsharp_completion_queue_create(); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_completion_queue_shutdown(CompletionQueueSafeHandle cq); + + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCompletionType grpcsharp_completion_queue_next_with_callback(CompletionQueueSafeHandle cq); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_completion_queue_destroy(IntPtr cq); + + private CompletionQueueSafeHandle() + { + } + + public static CompletionQueueSafeHandle Create() + { + return grpcsharp_completion_queue_create(); + } + + public GRPCCompletionType NextWithCallback() + { + return grpcsharp_completion_queue_next_with_callback(this); + } + + public void Shutdown() + { + grpcsharp_completion_queue_shutdown(this); + } + + protected override bool ReleaseHandle() + { + grpcsharp_completion_queue_destroy(handle); + return true; + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/Enums.cs b/src/csharp/Grpc.Core/Internal/Enums.cs new file mode 100644 index 0000000000..f363050b07 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/Enums.cs @@ -0,0 +1,115 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// from grpc/grpc.h + /// </summary> + internal enum GRPCCallError + { + /* everything went ok */ + GRPC_CALL_OK = 0, + /* something failed, we don't know what */ + GRPC_CALL_ERROR, + /* this method is not available on the server */ + GRPC_CALL_ERROR_NOT_ON_SERVER, + /* this method is not available on the client */ + GRPC_CALL_ERROR_NOT_ON_CLIENT, + /* this method must be called before server_accept */ + GRPC_CALL_ERROR_ALREADY_ACCEPTED, + /* this method must be called before invoke */ + GRPC_CALL_ERROR_ALREADY_INVOKED, + /* this method must be called after invoke */ + GRPC_CALL_ERROR_NOT_INVOKED, + /* this call is already finished + (writes_done or write_status has already been called) */ + GRPC_CALL_ERROR_ALREADY_FINISHED, + /* there is already an outstanding read/write operation on the call */ + GRPC_CALL_ERROR_TOO_MANY_OPERATIONS, + /* the flags value was illegal for this call */ + GRPC_CALL_ERROR_INVALID_FLAGS + } + + /// <summary> + /// grpc_completion_type from grpc/grpc.h + /// </summary> + internal enum GRPCCompletionType + { + /* Shutting down */ + GRPC_QUEUE_SHUTDOWN, + + /* operation completion */ + GRPC_OP_COMPLETE, + + /* A read has completed */ + GRPC_READ, + + /* A write has been accepted by flow control */ + GRPC_WRITE_ACCEPTED, + + /* writes_done or write_status has been accepted */ + GRPC_FINISH_ACCEPTED, + + /* The metadata array sent by server received at client */ + GRPC_CLIENT_METADATA_READ, + + /* An RPC has finished. The event contains status. + * On the server this will be OK or Cancelled. */ + GRPC_FINISHED, + + /* A new RPC has arrived at the server */ + GRPC_SERVER_RPC_NEW, + + /* The server has finished shutting down */ + GRPC_SERVER_SHUTDOWN, + + /* must be last, forces users to include a default: case */ + GRPC_COMPLETION_DO_NOT_USE + } + + /// <summary> + /// grpc_op_error from grpc/grpc.h + /// </summary> + internal enum GRPCOpError + { + /* everything went ok */ + GRPC_OP_OK = 0, + /* something failed, we don't know what */ + GRPC_OP_ERROR + } +} + diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs new file mode 100644 index 0000000000..9e69fe2f43 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -0,0 +1,125 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Pool of threads polling on the same completion queue. + /// </summary> + internal class GrpcThreadPool + { + readonly object myLock = new object(); + readonly List<Thread> threads = new List<Thread>(); + readonly int poolSize; + + CompletionQueueSafeHandle cq; + + public GrpcThreadPool(int poolSize) { + this.poolSize = poolSize; + } + + public void Start() { + + lock (myLock) + { + if (cq != null) + { + throw new InvalidOperationException("Already started."); + } + + cq = CompletionQueueSafeHandle.Create(); + + for (int i = 0; i < poolSize; i++) + { + threads.Add(CreateAndStartThread(i)); + } + } + } + + public void Stop() { + + lock (myLock) + { + cq.Shutdown(); + + Console.WriteLine("Waiting for GPRC threads to finish."); + foreach (var thread in threads) + { + thread.Join(); + } + + cq.Dispose(); + + } + } + + internal CompletionQueueSafeHandle CompletionQueue + { + get + { + return cq; + } + } + + private Thread CreateAndStartThread(int i) + { + var thread = new Thread(new ThreadStart(RunHandlerLoop)); + thread.IsBackground = false; + thread.Start(); + thread.Name = "grpc " + i; + return thread; + } + + /// <summary> + /// Body of the polling thread. + /// </summary> + private void RunHandlerLoop() + { + GRPCCompletionType completionType; + do + { + completionType = cq.NextWithCallback(); + } while(completionType != GRPCCompletionType.GRPC_QUEUE_SHUTDOWN); + Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting."); + } + } + +} + diff --git a/src/csharp/Grpc.Core/Internal/SafeHandleZeroIsInvalid.cs b/src/csharp/Grpc.Core/Internal/SafeHandleZeroIsInvalid.cs new file mode 100644 index 0000000000..aa6fce2e96 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/SafeHandleZeroIsInvalid.cs @@ -0,0 +1,67 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Safe handle to wrap native objects. + /// </summary> + internal abstract class SafeHandleZeroIsInvalid : SafeHandle + { + public SafeHandleZeroIsInvalid() : base(IntPtr.Zero, true) + { + } + + public SafeHandleZeroIsInvalid(bool ownsHandle) : base(IntPtr.Zero, ownsHandle) + { + } + + public override bool IsInvalid + { + get + { + return handle == IntPtr.Zero; + } + } + + protected override bool ReleaseHandle() + { + // handle is not owned. + return true; + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs new file mode 100644 index 0000000000..de9bbaf7c1 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -0,0 +1,112 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Runtime.InteropServices; + +namespace Grpc.Core.Internal +{ + // TODO: we need to make sure that the delegates are not collected before invoked. + internal delegate void ServerShutdownCallbackDelegate(IntPtr eventPtr); + + /// <summary> + /// grpc_server from grpc/grpc.h + /// </summary> + internal sealed class ServerSafeHandle : SafeHandleZeroIsInvalid + { + [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_server_request_call(ServerSafeHandle server, CompletionQueueSafeHandle cq, [MarshalAs(UnmanagedType.FunctionPtr)] CompletionCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern ServerSafeHandle grpcsharp_server_create(CompletionQueueSafeHandle cq, IntPtr args); + + [DllImport("grpc_csharp_ext.dll")] + static extern Int32 grpcsharp_server_add_http2_port(ServerSafeHandle server, string addr); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_start(ServerSafeHandle server); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_shutdown(ServerSafeHandle server); + + // TODO: get rid of the old callback style + [DllImport("grpc_csharp_ext.dll", EntryPoint = "grpcsharp_server_shutdown_and_notify")] + static extern void grpcsharp_server_shutdown_and_notify_CALLBACK(ServerSafeHandle server, [MarshalAs(UnmanagedType.FunctionPtr)] ServerShutdownCallbackDelegate callback); + + [DllImport("grpc_csharp_ext.dll")] + static extern void grpcsharp_server_destroy(IntPtr server); + + private ServerSafeHandle() + { + } + + public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, IntPtr args) + { + // TODO: also grpc_secure_server_create... + return grpcsharp_server_create(cq, args); + } + + public int AddPort(string addr) + { + return grpcsharp_server_add_http2_port(this, addr); + } + + public void Start() + { + grpcsharp_server_start(this); + } + + public void Shutdown() + { + grpcsharp_server_shutdown(this); + } + + public void ShutdownAndNotify(ServerShutdownCallbackDelegate callback) + { + grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); + } + + public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) + { + return grpcsharp_server_request_call(this, cq, callback); + } + + protected override bool ReleaseHandle() + { + grpcsharp_server_destroy(handle); + return true; + } + } +} diff --git a/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs new file mode 100644 index 0000000000..08d9921475 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/ServerStreamingOutputObserver.cs @@ -0,0 +1,71 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core.Internal; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// Observer that writes all arriving messages to a call abstraction (in blocking fashion) + /// and then halfcloses the call. Used for server-side call handling. + /// </summary> + internal class ServerStreamingOutputObserver<TWrite, TRead> : IObserver<TWrite> + { + readonly AsyncCall<TWrite, TRead> call; + + public ServerStreamingOutputObserver(AsyncCall<TWrite, TRead> call) + { + this.call = call; + } + + public void OnCompleted() + { + // TODO: how bad is the Wait here? + call.SendStatusFromServerAsync(new Status(StatusCode.OK, "")).Wait(); + } + + public void OnError(Exception error) + { + // TODO: implement this... + throw new InvalidOperationException("This should never be called."); + } + + public void OnNext(TWrite value) + { + // TODO: how bad is the Wait here? + call.SendMessageAsync(value).Wait(); + } + } +} + diff --git a/src/csharp/Grpc.Core/Internal/Timespec.cs b/src/csharp/Grpc.Core/Internal/Timespec.cs new file mode 100644 index 0000000000..b191ecde94 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/Timespec.cs @@ -0,0 +1,114 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading; + +namespace Grpc.Core.Internal +{ + /// <summary> + /// gpr_timespec from grpc/support/time.h + /// </summary> + [StructLayout(LayoutKind.Sequential)] + internal struct Timespec + { + const int nanosPerSecond = 1000 * 1000 * 1000; + const int nanosPerTick = 100; + + [DllImport("grpc_csharp_ext.dll")] + static extern Timespec gprsharp_now(); + + [DllImport("grpc_csharp_ext.dll")] + static extern Timespec gprsharp_inf_future(); + + [DllImport("grpc_csharp_ext.dll")] + static extern int gprsharp_sizeof_timespec(); + + // TODO: revisit this. + // NOTE: on linux 64bit sizeof(gpr_timespec) = 16, on windows 32bit sizeof(gpr_timespec) = 8 + // so IntPtr seems to have the right size to work on both. + public System.IntPtr tv_sec; + public System.IntPtr tv_nsec; + + /// <summary> + /// Timespec a long time in the future. + /// </summary> + public static Timespec InfFuture + { + get + { + return gprsharp_inf_future(); + } + } + + public static Timespec Now + { + get + { + return gprsharp_now(); + } + } + + internal static int NativeSize + { + get + { + return gprsharp_sizeof_timespec(); + } + } + + /// <summary> + /// Creates a GPR deadline from current instant and given timeout. + /// </summary> + /// <returns>The from timeout.</returns> + public static Timespec DeadlineFromTimeout(TimeSpan timeout) { + if (timeout == Timeout.InfiniteTimeSpan) + { + return Timespec.InfFuture; + } + return Timespec.Now.Add(timeout); + } + + public Timespec Add(TimeSpan timeSpan) { + long nanos = tv_nsec.ToInt64() + (timeSpan.Ticks % TimeSpan.TicksPerSecond) * nanosPerTick; + long overflow_sec = (nanos > nanosPerSecond) ? 1 : 0; + + Timespec result; + result.tv_nsec = new IntPtr(nanos % nanosPerSecond); + result.tv_sec = new IntPtr(tv_sec.ToInt64() + (timeSpan.Ticks / TimeSpan.TicksPerSecond) + overflow_sec); + return result; + } + } +} + diff --git a/src/csharp/Grpc.Core/Marshaller.cs b/src/csharp/Grpc.Core/Marshaller.cs new file mode 100644 index 0000000000..602e0eb824 --- /dev/null +++ b/src/csharp/Grpc.Core/Marshaller.cs @@ -0,0 +1,87 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + /// <summary> + /// For serializing and deserializing messages. + /// </summary> + public struct Marshaller<T> + { + readonly Func<T,byte[]> serializer; + readonly Func<byte[],T> deserializer; + + public Marshaller(Func<T, byte[]> serializer, Func<byte[], T> deserializer) + { + this.serializer = serializer; + this.deserializer = deserializer; + } + + public Func<T, byte[]> Serializer + { + get + { + return this.serializer; + } + } + + public Func<byte[], T> Deserializer + { + get + { + return this.deserializer; + } + } + } + + public static class Marshallers { + + public static Marshaller<T> Create<T>(Func<T,byte[]> serializer, Func<byte[],T> deserializer) + { + return new Marshaller<T>(serializer, deserializer); + } + + public static Marshaller<string> StringMarshaller + { + get + { + return new Marshaller<string>(System.Text.Encoding.UTF8.GetBytes, + System.Text.Encoding.UTF8.GetString); + } + } + + } +} + diff --git a/src/csharp/Grpc.Core/Method.cs b/src/csharp/Grpc.Core/Method.cs new file mode 100644 index 0000000000..c94aa8161f --- /dev/null +++ b/src/csharp/Grpc.Core/Method.cs @@ -0,0 +1,97 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + public enum MethodType + { + Unary, + ClientStreaming, + ServerStreaming, + DuplexStreaming + } + + /// <summary> + /// A description of a service method. + /// </summary> + public class Method<TRequest, TResponse> + { + readonly MethodType type; + readonly string name; + readonly Marshaller<TRequest> requestMarshaller; + readonly Marshaller<TResponse> responseMarshaller; + + public Method(MethodType type, string name, Marshaller<TRequest> requestMarshaller, Marshaller<TResponse> responseMarshaller) + { + this.type = type; + this.name = name; + this.requestMarshaller = requestMarshaller; + this.responseMarshaller = responseMarshaller; + } + + public MethodType Type + { + get + { + return this.type; + } + } + + public string Name + { + get + { + return this.name; + } + } + + public Marshaller<TRequest> RequestMarshaller + { + get + { + return this.requestMarshaller; + } + } + + public Marshaller<TResponse> ResponseMarshaller + { + get + { + return this.responseMarshaller; + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..37ba1e2263 --- /dev/null +++ b/src/csharp/Grpc.Core/Properties/AssemblyInfo.cs @@ -0,0 +1,24 @@ +using System.Reflection; +using System.Runtime.CompilerServices; + +// Information about this assembly is defined by the following attributes. +// Change them to the values specific to your project. +[assembly: AssemblyTitle ("Grpc.Core")] +[assembly: AssemblyDescription ("")] +[assembly: AssemblyConfiguration ("")] +[assembly: AssemblyCompany ("")] +[assembly: AssemblyProduct ("")] +[assembly: AssemblyCopyright ("Google Inc. All rights reserved.")] +[assembly: AssemblyTrademark ("")] +[assembly: AssemblyCulture ("")] +// The assembly version has the format "{Major}.{Minor}.{Build}.{Revision}". +// The form "{Major}.{Minor}.*" will automatically update the build and revision, +// and "{Major}.{Minor}.{Build}.*" will update just the revision. +[assembly: AssemblyVersion ("0.1.*")] +// The following attributes are used to specify the signing key for the assembly, +// if desired. See the Mono documentation for more information about signing. +//[assembly: AssemblyDelaySign(false)] +//[assembly: AssemblyKeyFile("")] + +[assembly: InternalsVisibleTo("Grpc.Core.Tests")] + diff --git a/src/csharp/Grpc.Core/RpcException.cs b/src/csharp/Grpc.Core/RpcException.cs new file mode 100644 index 0000000000..5a9d0039bc --- /dev/null +++ b/src/csharp/Grpc.Core/RpcException.cs @@ -0,0 +1,60 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + public class RpcException : Exception + { + private readonly Status status; + + public RpcException(Status status) + { + this.status = status; + } + + public RpcException(Status status, string message) : base(message) + { + this.status = status; + } + + public Status Status { + get + { + return status; + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs new file mode 100644 index 0000000000..002592a3d8 --- /dev/null +++ b/src/csharp/Grpc.Core/Server.cs @@ -0,0 +1,213 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading.Tasks; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + /// <summary> + /// Server is implemented only to be able to do + /// in-process testing. + /// </summary> + public class Server + { + // TODO: make sure the delegate doesn't get garbage collected while + // native callbacks are in the completion queue. + readonly ServerShutdownCallbackDelegate serverShutdownHandler; + readonly CompletionCallbackDelegate newServerRpcHandler; + + readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>(); + readonly ServerSafeHandle handle; + + readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); + + readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); + + public Server() + { + this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); + this.newServerRpcHandler = HandleNewServerRpc; + this.serverShutdownHandler = HandleServerShutdown; + } + + // only call this before Start() + public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) { + foreach(var entry in serviceDefinition.CallHandlers) + { + callHandlers.Add(entry.Key, entry.Value); + } + } + + // only call before Start() + public int AddPort(string addr) { + return handle.AddPort(addr); + } + + public void Start() + { + handle.Start(); + + // TODO: this basically means the server is single threaded.... + StartHandlingRpcs(); + } + + /// <summary> + /// Requests and handles single RPC call. + /// </summary> + internal void RunRpc() + { + AllowOneRpc(); + + try + { + var rpcInfo = newRpcQueue.Take(); + + //Console.WriteLine("Server received RPC " + rpcInfo.Method); + + IServerCallHandler callHandler; + if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) + { + callHandler = new NoSuchMethodCallHandler(); + } + callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue()); + } + catch(Exception e) + { + Console.WriteLine("Exception while handling RPC: " + e); + } + } + + /// <summary> + /// Requests server shutdown and when there are no more calls being serviced, + /// cleans up used resources. + /// </summary> + /// <returns>The async.</returns> + public async Task ShutdownAsync() { + handle.ShutdownAndNotify(serverShutdownHandler); + await shutdownTcs.Task; + handle.Dispose(); + } + + public void Kill() { + handle.Dispose(); + } + + private async Task StartHandlingRpcs() { + while (true) + { + await Task.Factory.StartNew(RunRpc); + } + } + + private void AllowOneRpc() + { + AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); + } + + private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { + try { + var ctx = new BatchContextSafeHandleNotOwned(batchContextPtr); + + if (error != GRPCOpError.GRPC_OP_OK) { + // TODO: handle error + } + + var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod()); + + // after server shutdown, the callback returns with null call + if (!rpcInfo.Call.IsInvalid) { + newRpcQueue.Add(rpcInfo); + } + + } catch(Exception e) { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private void HandleServerShutdown(IntPtr eventPtr) + { + try + { + shutdownTcs.SetResult(null); + } + catch (Exception e) + { + Console.WriteLine("Caught exception in a native handler: " + e); + } + } + + private static void AssertCallOk(GRPCCallError callError) + { + Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + } + + private static CompletionQueueSafeHandle GetCompletionQueue() + { + return GrpcEnvironment.ThreadPool.CompletionQueue; + } + + private struct NewRpcInfo + { + private CallSafeHandle call; + private string method; + + public NewRpcInfo(CallSafeHandle call, string method) + { + this.call = call; + this.method = method; + } + + public CallSafeHandle Call + { + get + { + return this.call; + } + } + + public string Method + { + get + { + return this.method; + } + } + } + } +} diff --git a/src/csharp/Grpc.Core/ServerCallHandler.cs b/src/csharp/Grpc.Core/ServerCallHandler.cs new file mode 100644 index 0000000000..1296947f34 --- /dev/null +++ b/src/csharp/Grpc.Core/ServerCallHandler.cs @@ -0,0 +1,136 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using Grpc.Core.Internal; + +namespace Grpc.Core +{ + internal interface IServerCallHandler + { + void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq); + } + + internal class UnaryRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + { + readonly Method<TRequest, TResponse> method; + readonly UnaryRequestServerMethod<TRequest, TResponse> handler; + + public UnaryRequestServerCallHandler(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall<TResponse, TRequest>( + method.ResponseMarshaller.Serializer, + method.RequestMarshaller.Deserializer); + + asyncCall.InitializeServer(call); + + var finishedTask = asyncCall.ServerSideUnaryRequestCallAsync(); + + var request = asyncCall.ReceiveMessageAsync().Result; + + var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); + handler(request, responseObserver); + + finishedTask.Wait(); + + } + } + + internal class StreamingRequestServerCallHandler<TRequest, TResponse> : IServerCallHandler + { + readonly Method<TRequest, TResponse> method; + readonly StreamingRequestServerMethod<TRequest, TResponse> handler; + + public StreamingRequestServerCallHandler(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler) + { + this.method = method; + this.handler = handler; + } + + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + var asyncCall = new AsyncCall<TResponse, TRequest>( + method.ResponseMarshaller.Serializer, + method.RequestMarshaller.Deserializer); + + asyncCall.InitializeServer(call); + + var responseObserver = new ServerStreamingOutputObserver<TResponse, TRequest>(asyncCall); + var requestObserver = handler(responseObserver); + var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(requestObserver); + finishedTask.Wait(); + } + } + + internal class NoSuchMethodCallHandler : IServerCallHandler + { + public void StartCall(string methodName, CallSafeHandle call, CompletionQueueSafeHandle cq) + { + // We don't care about the payload type here. + AsyncCall<byte[], byte[]> asyncCall = new AsyncCall<byte[], byte[]>( + (payload) => payload, (payload) => payload); + + + asyncCall.InitializeServer(call); + + var finishedTask = asyncCall.ServerSideStreamingRequestCallAsync(new NullObserver<byte[]>()); + + asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, "No such method.")).Wait(); + + finishedTask.Wait(); + } + } + + internal class NullObserver<T> : IObserver<T> + { + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(T value) + { + } + + } +} + diff --git a/src/csharp/Grpc.Core/ServerCalls.cs b/src/csharp/Grpc.Core/ServerCalls.cs new file mode 100644 index 0000000000..bed77796de --- /dev/null +++ b/src/csharp/Grpc.Core/ServerCalls.cs @@ -0,0 +1,58 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + // TODO: perhaps add also serverSideStreaming and clientSideStreaming + + public delegate void UnaryRequestServerMethod<TRequest, TResponse> (TRequest request, IObserver<TResponse> responseObserver); + + public delegate IObserver<TRequest> StreamingRequestServerMethod<TRequest, TResponse> (IObserver<TResponse> responseObserver); + + internal static class ServerCalls { + + public static IServerCallHandler UnaryRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, UnaryRequestServerMethod<TRequest, TResponse> handler) + { + return new UnaryRequestServerCallHandler<TRequest, TResponse>(method, handler); + } + + public static IServerCallHandler StreamingRequestCall<TRequest, TResponse>(Method<TRequest, TResponse> method, StreamingRequestServerMethod<TRequest, TResponse> handler) + { + return new StreamingRequestServerCallHandler<TRequest, TResponse>(method, handler); + } + + } +} + diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs new file mode 100644 index 0000000000..231c376062 --- /dev/null +++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs @@ -0,0 +1,98 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; + +namespace Grpc.Core +{ + public class ServerServiceDefinition + { + readonly string serviceName; + // TODO: we would need an immutable dictionary here... + readonly Dictionary<string, IServerCallHandler> callHandlers; + + private ServerServiceDefinition(string serviceName, Dictionary<string, IServerCallHandler> callHandlers) + { + this.serviceName = serviceName; + this.callHandlers = new Dictionary<string, IServerCallHandler>(callHandlers); + } + + internal Dictionary<string, IServerCallHandler> CallHandlers + { + get + { + return this.callHandlers; + } + } + + + public static Builder CreateBuilder(String serviceName) + { + return new Builder(serviceName); + } + + public class Builder + { + readonly string serviceName; + readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<String, IServerCallHandler>(); + + public Builder(string serviceName) + { + this.serviceName = serviceName; + } + + public Builder AddMethod<TRequest, TResponse>( + Method<TRequest, TResponse> method, + UnaryRequestServerMethod<TRequest, TResponse> handler) + { + callHandlers.Add(method.Name, ServerCalls.UnaryRequestCall(method, handler)); + return this; + } + + public Builder AddMethod<TRequest, TResponse>( + Method<TRequest, TResponse> method, + StreamingRequestServerMethod<TRequest, TResponse> handler) + { + callHandlers.Add(method.Name, ServerCalls.StreamingRequestCall(method, handler)); + return this; + } + + public ServerServiceDefinition Build() + { + return new ServerServiceDefinition(serviceName, callHandlers); + } + } + } +} + diff --git a/src/csharp/Grpc.Core/Status.cs b/src/csharp/Grpc.Core/Status.cs new file mode 100644 index 0000000000..5ea1df7b48 --- /dev/null +++ b/src/csharp/Grpc.Core/Status.cs @@ -0,0 +1,69 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; + +namespace Grpc.Core +{ + /// <summary> + /// Represents RPC result. + /// </summary> + public struct Status + { + readonly StatusCode statusCode; + readonly string detail; + + public Status(StatusCode statusCode, string detail) + { + this.statusCode = statusCode; + this.detail = detail; + } + + public StatusCode StatusCode + { + get + { + return statusCode; + } + } + + public string Detail + { + get + { + return detail; + } + } + } +} diff --git a/src/csharp/Grpc.Core/StatusCode.cs b/src/csharp/Grpc.Core/StatusCode.cs new file mode 100644 index 0000000000..1fbf9c1b68 --- /dev/null +++ b/src/csharp/Grpc.Core/StatusCode.cs @@ -0,0 +1,181 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core +{ + // TODO: element names should changed to comply with C# naming conventions. + /// <summary> + /// based on grpc_status_code from grpc/status.h + /// </summary> + public enum StatusCode + { + /* Not an error; returned on success + + HTTP Mapping: 200 OK */ + OK = 0, + /* The operation was cancelled (typically by the caller). + + HTTP Mapping: 499 Client Closed Request */ + Cancelled = 1, + /* Unknown error. An example of where this error may be returned is + if a Status value received from another address space belongs to + an error-space that is not known in this address space. Also + errors raised by APIs that do not return enough error information + may be converted to this error. + + HTTP Mapping: 500 Internal Server Error */ + Unknown = 2, + /* Client specified an invalid argument. Note that this differs + from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments + that are problematic regardless of the state of the system + (e.g., a malformed file name). + + HTTP Mapping: 400 Bad Request */ + InvalidArgument = 3, + /* Deadline expired before operation could complete. For operations + that change the state of the system, this error may be returned + even if the operation has completed successfully. For example, a + successful response from a server could have been delayed long + enough for the deadline to expire. + + HTTP Mapping: 504 Gateway Timeout */ + DeadlineExceeded = 4, + /* Some requested entity (e.g., file or directory) was not found. + + HTTP Mapping: 404 Not Found */ + NotFound = 5, + /* Some entity that we attempted to create (e.g., file or directory) + already exists. + + HTTP Mapping: 409 Conflict */ + AlreadyExists = 6, + /* The caller does not have permission to execute the specified + operation. PERMISSION_DENIED must not be used for rejections + caused by exhausting some resource (use RESOURCE_EXHAUSTED + instead for those errors). PERMISSION_DENIED must not be + used if the caller can not be identified (use UNAUTHENTICATED + instead for those errors). + + HTTP Mapping: 403 Forbidden */ + PermissionDenied = 7, + /* The request does not have valid authentication credentials for the + operation. + + HTTP Mapping: 401 Unauthorized */ + Unauthenticated = 16, + /* Some resource has been exhausted, perhaps a per-user quota, or + perhaps the entire file system is out of space. + + HTTP Mapping: 429 Too Many Requests */ + ResourceExhausted = 8, + /* Operation was rejected because the system is not in a state + required for the operation's execution. For example, directory + to be deleted may be non-empty, an rmdir operation is applied to + a non-directory, etc. + + A litmus test that may help a service implementor in deciding + between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE: + (a) Use UNAVAILABLE if the client can retry just the failing call. + (b) Use ABORTED if the client should retry at a higher-level + (e.g., restarting a read-modify-write sequence). + (c) Use FAILED_PRECONDITION if the client should not retry until + the system state has been explicitly fixed. E.g., if an "rmdir" + fails because the directory is non-empty, FAILED_PRECONDITION + should be returned since the client should not retry unless + they have first fixed up the directory by deleting files from it. + (d) Use FAILED_PRECONDITION if the client performs conditional + REST Get/Update/Delete on a resource and the resource on the + server does not match the condition. E.g., conflicting + read-modify-write on the same resource. + + HTTP Mapping: 400 Bad Request + + NOTE: HTTP spec says 412 Precondition Failed should only be used if + the request contains Etag related headers. So if the server does see + Etag related headers in the request, it may choose to return 412 + instead of 400 for this error code. */ + FailedPrecondition = 9, + /* The operation was aborted, typically due to a concurrency issue + like sequencer check failures, transaction aborts, etc. + + See litmus test above for deciding between FAILED_PRECONDITION, + ABORTED, and UNAVAILABLE. + + HTTP Mapping: 409 Conflict */ + Aborted = 10, + /* Operation was attempted past the valid range. E.g., seeking or + reading past end of file. + + Unlike INVALID_ARGUMENT, this error indicates a problem that may + be fixed if the system state changes. For example, a 32-bit file + system will generate INVALID_ARGUMENT if asked to read at an + offset that is not in the range [0,2^32-1], but it will generate + OUT_OF_RANGE if asked to read from an offset past the current + file size. + + There is a fair bit of overlap between FAILED_PRECONDITION and + OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific + error) when it applies so that callers who are iterating through + a space can easily look for an OUT_OF_RANGE error to detect when + they are done. + + HTTP Mapping: 400 Bad Request */ + OutOfRange = 11, + /* Operation is not implemented or not supported/enabled in this service. + + HTTP Mapping: 501 Not Implemented */ + Unimplemented = 12, + /* Internal errors. Means some invariants expected by underlying + system has been broken. If you see one of these errors, + something is very broken. + + HTTP Mapping: 500 Internal Server Error */ + Internal = 13, + /* The service is currently unavailable. This is a most likely a + transient condition and may be corrected by retrying with + a backoff. + + See litmus test above for deciding between FAILED_PRECONDITION, + ABORTED, and UNAVAILABLE. + + HTTP Mapping: 503 Service Unavailable */ + Unavailable = 14, + /* Unrecoverable data loss or corruption. + + HTTP Mapping: 500 Internal Server Error */ + DataLoss = 15 + } +} + diff --git a/src/csharp/Grpc.Core/Utils/RecordingObserver.cs b/src/csharp/Grpc.Core/Utils/RecordingObserver.cs new file mode 100644 index 0000000000..99d2725b70 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/RecordingObserver.cs @@ -0,0 +1,65 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Grpc.Core.Utils +{ + public class RecordingObserver<T> : IObserver<T> + { + TaskCompletionSource<List<T>> tcs = new TaskCompletionSource<List<T>>(); + List<T> data = new List<T>(); + + public void OnCompleted() + { + tcs.SetResult(data); + } + + public void OnError(Exception error) + { + tcs.SetException(error); + } + + public void OnNext(T value) + { + data.Add(value); + } + + public Task<List<T>> ToList() { + return tcs.Task; + } + } +} + diff --git a/src/csharp/Grpc.Core/Utils/RecordingQueue.cs b/src/csharp/Grpc.Core/Utils/RecordingQueue.cs new file mode 100644 index 0000000000..63992da6a9 --- /dev/null +++ b/src/csharp/Grpc.Core/Utils/RecordingQueue.cs @@ -0,0 +1,84 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Collections.Concurrent; + +namespace Grpc.Core.Utils +{ + // TODO: replace this by something that implements IAsyncEnumerator. + /// <summary> + /// Observer that allows us to await incoming messages one-by-one. + /// The implementation is not ideal and class will be probably replaced + /// by something more versatile in the future. + /// </summary> + public class RecordingQueue<T> : IObserver<T> + { + readonly BlockingCollection<T> queue = new BlockingCollection<T>(); + TaskCompletionSource<object> tcs = new TaskCompletionSource<object>(); + + public void OnCompleted() + { + tcs.SetResult(null); + } + + public void OnError(Exception error) + { + tcs.SetException(error); + } + + public void OnNext(T value) + { + queue.Add(value); + } + + public BlockingCollection<T> Queue + { + get + { + return queue; + } + } + + public Task Finished + { + get + { + return tcs.Task; + } + } + } +} + |