diff options
author | Hongyu Chen <hongyu@google.com> | 2015-08-24 10:59:48 -0700 |
---|---|---|
committer | Hongyu Chen <hongyu@google.com> | 2015-08-24 10:59:48 -0700 |
commit | 4473e52311604c27c94994509e4a6322cd0571dc (patch) | |
tree | fac0c7830939cf1aaad95a148f6ea5d1909793fc /src | |
parent | e09dc78e74f481dd06bf2f9ea643026f5e94bb5b (diff) | |
parent | 04715888e60c6195a2c1d9d6b31f7a82f0d717e2 (diff) |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'src')
83 files changed, 4128 insertions, 427 deletions
diff --git a/src/csharp/.gitignore b/src/csharp/.gitignore index ae48956567..48365e32a5 100644 --- a/src/csharp/.gitignore +++ b/src/csharp/.gitignore @@ -5,4 +5,5 @@ test-results packages Grpc.v12.suo TestResult.xml +/TestResults *.nupkg diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs index 2787572924..dfbd92879e 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs @@ -41,12 +41,6 @@ namespace Grpc.Core.Tests { public class ChannelTest { - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void Constructor_RejectsInvalidParams() { @@ -56,36 +50,33 @@ namespace Grpc.Core.Tests [Test] public void State_IdleAfterCreation() { - using (var channel = new Channel("localhost", Credentials.Insecure)) - { - Assert.AreEqual(ChannelState.Idle, channel.State); - } + var channel = new Channel("localhost", Credentials.Insecure); + Assert.AreEqual(ChannelState.Idle, channel.State); + channel.ShutdownAsync().Wait(); } [Test] public void WaitForStateChangedAsync_InvalidArgument() { - using (var channel = new Channel("localhost", Credentials.Insecure)) - { - Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure)); - } + var channel = new Channel("localhost", Credentials.Insecure); + Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure)); + channel.ShutdownAsync().Wait(); } [Test] public void ResolvedTarget() { - using (var channel = new Channel("127.0.0.1", Credentials.Insecure)) - { - Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1")); - } + var channel = new Channel("127.0.0.1", Credentials.Insecure); + Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1")); + channel.ShutdownAsync().Wait(); } [Test] - public void Dispose_IsIdempotent() + public void Shutdown_AllowedOnlyOnce() { var channel = new Channel("localhost", Credentials.Insecure); - channel.Dispose(); - channel.Dispose(); + channel.ShutdownAsync().Wait(); + Assert.Throws(typeof(InvalidOperationException), () => channel.ShutdownAsync().GetAwaiter().GetResult()); } } } diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index e49fdb5268..68279a2007 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -63,16 +63,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public async Task UnaryCall() { @@ -208,13 +202,6 @@ namespace Grpc.Core.Tests } [Test] - public void UnaryCall_DisposedChannel() - { - channel.Dispose(); - Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC")); - } - - [Test] public void UnaryCallPerformance() { helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => diff --git a/src/csharp/Grpc.Core.Tests/CompressionTest.cs b/src/csharp/Grpc.Core.Tests/CompressionTest.cs index 9547683f60..378c81851c 100644 --- a/src/csharp/Grpc.Core.Tests/CompressionTest.cs +++ b/src/csharp/Grpc.Core.Tests/CompressionTest.cs @@ -62,16 +62,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void WriteOptions_Unary() { diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs index db5f953b0e..2db3f286f7 100644 --- a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs +++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs @@ -62,16 +62,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public async Task PropagateCancellation() { diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index d6a8f52570..b571fe9025 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -64,6 +64,8 @@ <Link>Version.cs</Link> </Compile> <Compile Include="ClientBaseTest.cs" /> + <Compile Include="ShutdownTest.cs" /> + <Compile Include="Internal\AsyncCallTest.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="ClientServerTest.cs" /> <Compile Include="ServerTest.cs" /> @@ -82,6 +84,7 @@ <Compile Include="ResponseHeadersTest.cs" /> <Compile Include="CompressionTest.cs" /> <Compile Include="ContextPropagationTest.cs" /> + <Compile Include="MetadataTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index 4ed93c7eca..78295cf6d4 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -43,31 +43,40 @@ namespace Grpc.Core.Tests [Test] public void InitializeAndShutdownGrpcEnvironment() { - var env = GrpcEnvironment.GetInstance(); + var env = GrpcEnvironment.AddRef(); Assert.IsNotNull(env.CompletionQueue); - GrpcEnvironment.Shutdown(); + GrpcEnvironment.Release(); } [Test] public void SubsequentInvocations() { - var env1 = GrpcEnvironment.GetInstance(); - var env2 = GrpcEnvironment.GetInstance(); - Assert.IsTrue(object.ReferenceEquals(env1, env2)); - GrpcEnvironment.Shutdown(); - GrpcEnvironment.Shutdown(); + var env1 = GrpcEnvironment.AddRef(); + var env2 = GrpcEnvironment.AddRef(); + Assert.AreSame(env1, env2); + GrpcEnvironment.Release(); + GrpcEnvironment.Release(); } [Test] public void InitializeAfterShutdown() { - var env1 = GrpcEnvironment.GetInstance(); - GrpcEnvironment.Shutdown(); + Assert.AreEqual(0, GrpcEnvironment.GetRefCount()); - var env2 = GrpcEnvironment.GetInstance(); - GrpcEnvironment.Shutdown(); + var env1 = GrpcEnvironment.AddRef(); + GrpcEnvironment.Release(); - Assert.IsFalse(object.ReferenceEquals(env1, env2)); + var env2 = GrpcEnvironment.AddRef(); + GrpcEnvironment.Release(); + + Assert.AreNotSame(env1, env2); + } + + [Test] + public void ReleaseWithoutAddRef() + { + Assert.AreEqual(0, GrpcEnvironment.GetRefCount()); + Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release()); } [Test] diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs new file mode 100644 index 0000000000..685c5f7d6c --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs @@ -0,0 +1,222 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +using Grpc.Core.Internal; +using NUnit.Framework; + +namespace Grpc.Core.Internal.Tests +{ + public class AsyncCallTest + { + Channel channel; + FakeNativeCall fakeCall; + AsyncCall<string, string> asyncCall; + + [SetUp] + public void Init() + { + channel = new Channel("localhost", Credentials.Insecure); + + fakeCall = new FakeNativeCall(); + + var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions()); + asyncCall = new AsyncCall<string, string>(callDetails, fakeCall); + } + + [TearDown] + public void Cleanup() + { + channel.ShutdownAsync().Wait(); + } + + [Test] + public void AsyncUnary_CompletionSuccess() + { + var resultTask = asyncCall.UnaryCallAsync("abc"); + fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), new byte[] { 1, 2, 3 }, new Metadata()); + Assert.IsTrue(resultTask.IsCompleted); + Assert.IsTrue(fakeCall.IsDisposed); + Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus()); + } + + [Test] + public void AsyncUnary_CompletionFailure() + { + var resultTask = asyncCall.UnaryCallAsync("abc"); + fakeCall.UnaryResponseClientHandler(false, new ClientSideStatus(new Status(StatusCode.Internal, ""), null), new byte[] { 1, 2, 3 }, new Metadata()); + + Assert.IsTrue(resultTask.IsCompleted); + Assert.IsTrue(fakeCall.IsDisposed); + + Assert.AreEqual(StatusCode.Internal, asyncCall.GetStatus().StatusCode); + Assert.IsNull(asyncCall.GetTrailers()); + var ex = Assert.Throws<RpcException>(() => resultTask.GetAwaiter().GetResult()); + Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode); + } + + internal class FakeNativeCall : INativeCall + { + public UnaryResponseClientHandler UnaryResponseClientHandler + { + get; + set; + } + + public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler + { + get; + set; + } + + public ReceivedMessageHandler ReceivedMessageHandler + { + get; + set; + } + + public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler + { + get; + set; + } + + public SendCompletionHandler SendCompletionHandler + { + get; + set; + } + + public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler + { + get; + set; + } + + public bool IsCancelled + { + get; + set; + } + + public bool IsDisposed + { + get; + set; + } + + public void Cancel() + { + IsCancelled = true; + } + + public void CancelWithStatus(Status status) + { + IsCancelled = true; + } + + public string GetPeer() + { + return "PEER"; + } + + public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + UnaryResponseClientHandler = callback; + } + + public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + throw new NotImplementedException(); + } + + public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) + { + UnaryResponseClientHandler = callback; + } + + public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) + { + ReceivedStatusOnClientHandler = callback; + } + + public void StartReceiveMessage(ReceivedMessageHandler callback) + { + ReceivedMessageHandler = callback; + } + + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + { + ReceivedResponseHeadersHandler = callback; + } + + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) + { + SendCompletionHandler = callback; + } + + public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + { + SendCompletionHandler = callback; + } + + public void StartSendCloseFromClient(SendCompletionHandler callback) + { + SendCompletionHandler = callback; + } + + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + { + SendCompletionHandler = callback; + } + + public void StartServerSide(ReceivedCloseOnServerHandler callback) + { + ReceivedCloseOnServerHandler = callback; + } + + public void Dispose() + { + IsDisposed = true; + } + } + } +}
\ No newline at end of file diff --git a/src/csharp/Grpc.Core.Tests/MetadataTest.cs b/src/csharp/Grpc.Core.Tests/MetadataTest.cs new file mode 100644 index 0000000000..c00f945d6a --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/MetadataTest.cs @@ -0,0 +1,120 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class MetadataTest + { + [Test] + public void AsciiEntry() + { + var entry = new Metadata.Entry("ABC", "XYZ"); + Assert.IsFalse(entry.IsBinary); + Assert.AreEqual("abc", entry.Key); // key is in lowercase. + Assert.AreEqual("XYZ", entry.Value); + CollectionAssert.AreEqual(new[] { (byte)'X', (byte)'Y', (byte)'Z' }, entry.ValueBytes); + + Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc-bin", "xyz")); + + Assert.AreEqual("[Entry: key=abc, value=XYZ]", entry.ToString()); + } + + [Test] + public void BinaryEntry() + { + var bytes = new byte[] { 1, 2, 3 }; + var entry = new Metadata.Entry("ABC-BIN", bytes); + Assert.IsTrue(entry.IsBinary); + Assert.AreEqual("abc-bin", entry.Key); // key is in lowercase. + Assert.Throws(typeof(InvalidOperationException), () => { var v = entry.Value; }); + CollectionAssert.AreEqual(bytes, entry.ValueBytes); + + Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc", bytes)); + + Assert.AreEqual("[Entry: key=abc-bin, valueBytes=System.Byte[]]", entry.ToString()); + } + + [Test] + public void Entry_ConstructionPreconditions() + { + Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry(null, "xyz")); + Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry("abc", (string)null)); + Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry("abc-bin", (byte[])null)); + } + + [Test] + public void Entry_Immutable() + { + var origBytes = new byte[] { 1, 2, 3 }; + var bytes = new byte[] { 1, 2, 3 }; + var entry = new Metadata.Entry("ABC-BIN", bytes); + bytes[0] = 255; // changing the array passed to constructor should have any effect. + CollectionAssert.AreEqual(origBytes, entry.ValueBytes); + + entry.ValueBytes[0] = 255; + CollectionAssert.AreEqual(origBytes, entry.ValueBytes); + } + + [Test] + public void Entry_CreateUnsafe_Ascii() + { + var bytes = new byte[] { (byte)'X', (byte)'y' }; + var entry = Metadata.Entry.CreateUnsafe("abc", bytes); + Assert.IsFalse(entry.IsBinary); + Assert.AreEqual("abc", entry.Key); + Assert.AreEqual("Xy", entry.Value); + CollectionAssert.AreEqual(bytes, entry.ValueBytes); + } + + [Test] + public void Entry_CreateUnsafe_Binary() + { + var bytes = new byte[] { 1, 2, 3 }; + var entry = Metadata.Entry.CreateUnsafe("abc-bin", bytes); + Assert.IsTrue(entry.IsBinary); + Assert.AreEqual("abc-bin", entry.Key); + Assert.Throws(typeof(InvalidOperationException), () => { var v = entry.Value; }); + CollectionAssert.AreEqual(bytes, entry.ValueBytes); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs index 981b8ea3c8..a1648f3671 100644 --- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs +++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs @@ -32,13 +32,16 @@ #endregion using System; +using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; + using Grpc.Core; using Grpc.Core.Internal; using Grpc.Core.Utils; + using NUnit.Framework; namespace Grpc.Core.Tests @@ -69,14 +72,82 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() + [Test] + public async Task ResponseHeadersAsync_UnaryCall() + { + helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => + { + await context.WriteResponseHeadersAsync(headers); + return "PASS"; + }); + + var call = Calls.AsyncUnaryCall(helper.CreateUnaryCall(), ""); + var responseHeaders = await call.ResponseHeadersAsync; + + Assert.AreEqual(headers.Count, responseHeaders.Count); + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + Assert.AreEqual("abcdefg", responseHeaders[0].Value); + + Assert.AreEqual("PASS", await call.ResponseAsync); + } + + [Test] + public async Task ResponseHeadersAsync_ClientStreamingCall() + { + helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) => + { + await context.WriteResponseHeadersAsync(headers); + return "PASS"; + }); + + var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall()); + await call.RequestStream.CompleteAsync(); + var responseHeaders = await call.ResponseHeadersAsync; + + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + Assert.AreEqual("PASS", await call.ResponseAsync); + } + + [Test] + public async Task ResponseHeadersAsync_ServerStreamingCall() + { + helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) => + { + await context.WriteResponseHeadersAsync(headers); + await responseStream.WriteAsync("PASS"); + }); + + var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), ""); + var responseHeaders = await call.ResponseHeadersAsync; + + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + CollectionAssert.AreEqual(new[] { "PASS" }, await call.ResponseStream.ToListAsync()); + } + + [Test] + public async Task ResponseHeadersAsync_DuplexStreamingCall() { - GrpcEnvironment.Shutdown(); + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => + { + await context.WriteResponseHeadersAsync(headers); + while (await requestStream.MoveNext()) + { + await responseStream.WriteAsync(requestStream.Current); + } + }); + + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall()); + var responseHeaders = await call.ResponseHeadersAsync; + + var messages = new[] { "PASS" }; + await call.RequestStream.WriteAllAsync(messages); + + Assert.AreEqual("ascii-header", responseHeaders[0].Key); + CollectionAssert.AreEqual(messages, await call.ResponseStream.ToListAsync()); } [Test] diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs index 485006ebac..e7193c843b 100644 --- a/src/csharp/Grpc.Core.Tests/ServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs @@ -51,7 +51,6 @@ namespace Grpc.Core.Tests }; server.Start(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] @@ -67,8 +66,7 @@ namespace Grpc.Core.Tests Assert.Greater(boundPort.BoundPort, 0); server.Start(); - server.ShutdownAsync(); - GrpcEnvironment.Shutdown(); + server.ShutdownAsync().Wait(); } [Test] @@ -83,7 +81,6 @@ namespace Grpc.Core.Tests Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build())); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } } } diff --git a/src/csharp/Grpc.Core.Tests/ShutdownTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs new file mode 100644 index 0000000000..a2be7ddd5e --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs @@ -0,0 +1,77 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class ShutdownTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [Test] + public async Task AbandonedCall() + { + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => + { + await requestStream.ToListAsync(); + }); + + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddMilliseconds(1)))); + + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs index d875d601b9..41f661f62d 100644 --- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs +++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs @@ -65,16 +65,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void InfiniteDeadline() { diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs index fb9b562c77..dbaa3085c5 100644 --- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs @@ -44,14 +44,16 @@ namespace Grpc.Core { readonly IClientStreamWriter<TRequest> requestStream; readonly Task<TResponse> responseAsync; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncClientStreamingCall(IClientStreamWriter<TRequest> requestStream, Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseAsync = responseAsync; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -69,6 +71,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Async stream to send streaming requests. /// </summary> public IClientStreamWriter<TRequest> RequestStream diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs index 183c84216a..ee7ba29695 100644 --- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; namespace Grpc.Core { @@ -42,14 +43,16 @@ namespace Grpc.Core { readonly IClientStreamWriter<TRequest> requestStream; readonly IAsyncStreamReader<TResponse> responseStream; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncDuplexStreamingCall(IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.requestStream = requestStream; this.responseStream = responseStream; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -78,6 +81,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Gets the call status if the call has already finished. /// Throws InvalidOperationException otherwise. /// </summary> diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs index ab2049f269..2853a79ce6 100644 --- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs +++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs @@ -32,6 +32,7 @@ #endregion using System; +using System.Threading.Tasks; namespace Grpc.Core { @@ -41,13 +42,15 @@ namespace Grpc.Core public sealed class AsyncServerStreamingCall<TResponse> : IDisposable { readonly IAsyncStreamReader<TResponse> responseStream; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncServerStreamingCall(IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.responseStream = responseStream; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -65,6 +68,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Gets the call status if the call has already finished. /// Throws InvalidOperationException otherwise. /// </summary> diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs index 224e343916..154a17a33e 100644 --- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs +++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs @@ -43,13 +43,15 @@ namespace Grpc.Core public sealed class AsyncUnaryCall<TResponse> : IDisposable { readonly Task<TResponse> responseAsync; + readonly Task<Metadata> responseHeadersAsync; readonly Func<Status> getStatusFunc; readonly Func<Metadata> getTrailersFunc; readonly Action disposeAction; - public AsyncUnaryCall(Task<TResponse> responseAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) + public AsyncUnaryCall(Task<TResponse> responseAsync, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) { this.responseAsync = responseAsync; + this.responseHeadersAsync = responseHeadersAsync; this.getStatusFunc = getStatusFunc; this.getTrailersFunc = getTrailersFunc; this.disposeAction = disposeAction; @@ -67,6 +69,17 @@ namespace Grpc.Core } /// <summary> + /// Asynchronous access to response headers. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return this.responseHeadersAsync; + } + } + + /// <summary> /// Allows awaiting this object directly. /// </summary> public TaskAwaiter<TResponse> GetAwaiter() diff --git a/src/csharp/Grpc.Core/Calls.cs b/src/csharp/Grpc.Core/Calls.cs index 7067456638..e57ac89db3 100644 --- a/src/csharp/Grpc.Core/Calls.cs +++ b/src/csharp/Grpc.Core/Calls.cs @@ -74,7 +74,7 @@ namespace Grpc.Core { var asyncCall = new AsyncCall<TRequest, TResponse>(call); var asyncResult = asyncCall.UnaryCallAsync(req); - return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncUnaryCall<TResponse>(asyncResult, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } /// <summary> @@ -93,7 +93,7 @@ namespace Grpc.Core var asyncCall = new AsyncCall<TRequest, TResponse>(call); asyncCall.StartServerStreamingCall(req); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncServerStreamingCall<TResponse>(responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } /// <summary> @@ -110,7 +110,7 @@ namespace Grpc.Core var asyncCall = new AsyncCall<TRequest, TResponse>(call); var resultTask = asyncCall.ClientStreamingCallAsync(); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); - return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncClientStreamingCall<TRequest, TResponse>(requestStream, resultTask, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } /// <summary> @@ -130,7 +130,7 @@ namespace Grpc.Core asyncCall.StartDuplexStreamingCall(); var requestStream = new ClientRequestStream<TRequest, TResponse>(asyncCall); var responseStream = new ClientResponseStream<TRequest, TResponse>(asyncCall); - return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); + return new AsyncDuplexStreamingCall<TRequest, TResponse>(requestStream, responseStream, asyncCall.ResponseHeadersAsync, asyncCall.GetStatus, asyncCall.GetTrailers, asyncCall.Cancel); } } } diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 64c6adf2bf..c11b320a64 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -45,15 +45,19 @@ namespace Grpc.Core /// <summary> /// gRPC Channel /// </summary> - public class Channel : IDisposable + public class Channel { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>(); + readonly object myLock = new object(); + readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly string target; readonly GrpcEnvironment environment; readonly ChannelSafeHandle handle; readonly List<ChannelOption> options; - bool disposed; + + bool shutdownRequested; /// <summary> /// Creates a channel that connects to a specific host. @@ -65,7 +69,7 @@ namespace Grpc.Core public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null) { this.target = Preconditions.CheckNotNull(target, "target"); - this.environment = GrpcEnvironment.GetInstance(); + this.environment = GrpcEnvironment.AddRef(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); EnsureUserAgentChannelOption(this.options); @@ -172,12 +176,26 @@ namespace Grpc.Core } /// <summary> - /// Destroys the underlying channel. + /// Waits until there are no more active calls for this channel and then cleans up + /// resources used by this channel. /// </summary> - public void Dispose() + public async Task ShutdownAsync() { - Dispose(true); - GC.SuppressFinalize(this); + lock (myLock) + { + Preconditions.CheckState(!shutdownRequested); + shutdownRequested = true; + } + + var activeCallCount = activeCallCounter.Count; + if (activeCallCount > 0) + { + Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount); + } + + handle.Dispose(); + + await Task.Run(() => GrpcEnvironment.Release()); } internal ChannelSafeHandle Handle @@ -196,13 +214,20 @@ namespace Grpc.Core } } - protected virtual void Dispose(bool disposing) + internal void AddCallReference(object call) { - if (disposing && handle != null && !disposed) - { - disposed = true; - handle.Dispose(); - } + activeCallCounter.Increment(); + + bool success = false; + handle.DangerousAddRef(ref success); + Preconditions.CheckState(success); + } + + internal void RemoveCallReference(object call) + { + handle.DangerousRelease(); + + activeCallCounter.Decrement(); } private static void EnsureUserAgentChannelOption(List<ChannelOption> options) diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index 7bc100ca60..903449439b 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -119,7 +119,8 @@ namespace Grpc.Core internal static string GetAuthUriBase(string target) { var match = ChannelTargetPattern.Match(target); - if (!match.Success) { + if (!match.Success) + { return null; } return "https://" + match.Groups[2].Value + "/"; diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs index 2e4bfc9e47..a5bf1b5a70 100644 --- a/src/csharp/Grpc.Core/ContextPropagationToken.cs +++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs @@ -132,7 +132,6 @@ namespace Grpc.Core bool propagateDeadline; bool propagateCancellation; - /// <summary> /// Creates new context propagation options. /// </summary> diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj index 055aff1444..ad2af17bc7 100644 --- a/src/csharp/Grpc.Core/Grpc.Core.csproj +++ b/src/csharp/Grpc.Core/Grpc.Core.csproj @@ -49,6 +49,7 @@ <Compile Include="AsyncDuplexStreamingCall.cs" /> <Compile Include="AsyncServerStreamingCall.cs" /> <Compile Include="IClientStreamWriter.cs" /> + <Compile Include="Internal\INativeCall.cs" /> <Compile Include="IServerStreamWriter.cs" /> <Compile Include="IAsyncStreamWriter.cs" /> <Compile Include="IAsyncStreamReader.cs" /> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 30d8c80235..e7c04185c2 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -58,6 +58,7 @@ namespace Grpc.Core static object staticLock = new object(); static GrpcEnvironment instance; + static int refCount; static ILogger logger = new ConsoleLogger(); @@ -67,13 +68,14 @@ namespace Grpc.Core bool isClosed; /// <summary> - /// Returns an instance of initialized gRPC environment. - /// Subsequent invocations return the same instance unless Shutdown has been called first. + /// Returns a reference-counted instance of initialized gRPC environment. + /// Subsequent invocations return the same instance unless reference count has dropped to zero previously. /// </summary> - internal static GrpcEnvironment GetInstance() + internal static GrpcEnvironment AddRef() { lock (staticLock) { + refCount++; if (instance == null) { instance = new GrpcEnvironment(); @@ -83,14 +85,16 @@ namespace Grpc.Core } /// <summary> - /// Shuts down the gRPC environment if it was initialized before. - /// Blocks until the environment has been fully shutdown. + /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero. + /// (and blocks until the environment has been fully shutdown). /// </summary> - public static void Shutdown() + internal static void Release() { lock (staticLock) { - if (instance != null) + Preconditions.CheckState(refCount > 0); + refCount--; + if (refCount == 0) { instance.Close(); instance = null; @@ -98,6 +102,14 @@ namespace Grpc.Core } } + internal static int GetRefCount() + { + lock (staticLock) + { + return refCount; + } + } + /// <summary> /// Gets application-wide logger used by gRPC. /// </summary> @@ -125,12 +137,10 @@ namespace Grpc.Core private GrpcEnvironment() { NativeLogRedirector.Redirect(); - grpcsharp_init(); + GrpcNativeInit(); completionRegistry = new CompletionRegistry(this); threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE); threadPool.Start(); - // TODO: use proper logging here - Logger.Info("gRPC initialized."); } /// <summary> @@ -175,6 +185,16 @@ namespace Grpc.Core return Marshal.PtrToStringAnsi(ptr); } + internal static void GrpcNativeInit() + { + grpcsharp_init(); + } + + internal static void GrpcNativeShutdown() + { + grpcsharp_shutdown(); + } + /// <summary> /// Shuts down this environment. /// </summary> @@ -185,12 +205,10 @@ namespace Grpc.Core throw new InvalidOperationException("Close has already been called"); } threadPool.Stop(); - grpcsharp_shutdown(); + GrpcNativeShutdown(); isClosed = true; debugStats.CheckOK(); - - Logger.Info("gRPC shutdown."); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 2c3e3d75ea..be5d611a53 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -51,22 +51,35 @@ namespace Grpc.Core.Internal static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<AsyncCall<TRequest, TResponse>>(); readonly CallInvocationDetails<TRequest, TResponse> details; + readonly INativeCall injectedNativeCall; // for testing // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; + // Indicates that steaming call has finished. + TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>(); + + // Response headers set here once received. + TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>(); + // Set after status is received. Used for both unary and streaming response calls. ClientSideStatus? finishedStatus; - bool readObserverCompleted; // True if readObserver has already been completed. - public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails) - : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer) + : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment) { this.details = callDetails.WithOptions(callDetails.Options.Normalize()); this.initialMetadataSent = true; // we always send metadata at the very beginning of the call. } + /// <summary> + /// This constructor should only be used for testing. + /// </summary> + public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails, INativeCall injectedNativeCall) : this(callDetails) + { + this.injectedNativeCall = injectedNativeCall; + } + // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but // it is reusing fair amount of code in this class, so we are leaving it here. /// <summary> @@ -100,7 +113,7 @@ namespace Grpc.Core.Internal bool success = (ev.success != 0); try { - HandleUnaryResponse(success, ctx); + HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata()); } catch (Exception e) { @@ -125,7 +138,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(details.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); halfcloseRequested = true; readingDone = true; @@ -152,7 +165,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(details.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); readingDone = true; @@ -176,10 +189,9 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(details.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); halfcloseRequested = true; - halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called. byte[] payload = UnsafeSerialize(msg); @@ -187,6 +199,7 @@ namespace Grpc.Core.Internal { call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall()); } + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); } } @@ -201,12 +214,13 @@ namespace Grpc.Core.Internal Preconditions.CheckState(!started); started = true; - Initialize(details.Channel.Environment.CompletionQueue); + Initialize(environment.CompletionQueue); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartDuplexStreaming(HandleFinished, metadataArray); } + call.StartReceiveInitialMetadata(HandleReceivedResponseHeaders); } } @@ -248,6 +262,28 @@ namespace Grpc.Core.Internal } /// <summary> + /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise. + /// </summary> + public Task StreamingCallFinishedTask + { + get + { + return streamingCallFinishedTcs.Task; + } + } + + /// <summary> + /// Get the task that completes once response headers are received. + /// </summary> + public Task<Metadata> ResponseHeadersAsync + { + get + { + return responseHeadersTcs.Task; + } + } + + /// <summary> /// Gets the resulting status if the call has already finished. /// Throws InvalidOperationException otherwise. /// </summary> @@ -281,51 +317,31 @@ namespace Grpc.Core.Internal } } - /// <summary> - /// On client-side, we only fire readCompletionDelegate once all messages have been read - /// and status has been received. - /// </summary> - protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate) + protected override void OnAfterReleaseResources() { - if (completionDelegate != null && readingDone && finishedStatus.HasValue) - { - bool shouldComplete; - lock (myLock) - { - shouldComplete = !readObserverCompleted; - readObserverCompleted = true; - } - - if (shouldComplete) - { - var status = finishedStatus.Value.Status; - if (status.StatusCode != StatusCode.OK) - { - FireCompletion(completionDelegate, default(TResponse), new RpcException(status)); - } - else - { - FireCompletion(completionDelegate, default(TResponse), null); - } - } - } + details.Channel.RemoveCallReference(this); } - protected override void OnReleaseResources() + private void Initialize(CompletionQueueSafeHandle cq) { - details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement(); + var call = CreateNativeCall(cq); + details.Channel.AddCallReference(this); + InitializeInternal(call); + RegisterCancellationCallback(); } - private void Initialize(CompletionQueueSafeHandle cq) + private INativeCall CreateNativeCall(CompletionQueueSafeHandle cq) { + if (injectedNativeCall != null) + { + return injectedNativeCall; // allows injecting a mock INativeCall in tests. + } + var parentCall = details.Options.PropagationToken != null ? details.Options.PropagationToken.ParentCall : CallSafeHandle.NullInstance; - var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, + return details.Channel.Handle.CreateCall(environment.CompletionRegistry, parentCall, ContextPropagationToken.DefaultMask, cq, details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value)); - details.Channel.Environment.DebugStats.ActiveClientCalls.Increment(); - InitializeInternal(call); - RegisterCancellationCallback(); } // Make sure that once cancellationToken for this call is cancelled, Cancel() will be called. @@ -348,31 +364,31 @@ namespace Grpc.Core.Internal } /// <summary> - /// Handler for unary response completion. + /// Handles receive status completion for calls with streaming response. /// </summary> - private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx) + private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders) { - var fullStatus = ctx.GetReceivedStatusOnClient(); + responseHeadersTcs.SetResult(responseHeaders); + } + /// <summary> + /// Handler for unary response completion. + /// </summary> + private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders) + { lock (myLock) { finished = true; - finishedStatus = fullStatus; - - halfclosed = true; + finishedStatus = receivedStatus; ReleaseResourcesIfPossible(); } - if (!success) - { - unaryResponseTcs.SetException(new RpcException(new Status(StatusCode.Internal, "Internal error occured."))); - return; - } + responseHeadersTcs.SetResult(responseHeaders); - var status = fullStatus.Status; + var status = receivedStatus.Status; - if (status.StatusCode != StatusCode.OK) + if (!success || status.StatusCode != StatusCode.OK) { unaryResponseTcs.SetException(new RpcException(status)); return; @@ -380,7 +396,7 @@ namespace Grpc.Core.Internal // TODO: handle deserialization error TResponse msg; - TryDeserialize(ctx.GetReceivedMessage(), out msg); + TryDeserialize(receivedMessage, out msg); unaryResponseTcs.SetResult(msg); } @@ -388,22 +404,25 @@ namespace Grpc.Core.Internal /// <summary> /// Handles receive status completion for calls with streaming response. /// </summary> - private void HandleFinished(bool success, BatchContextSafeHandle ctx) + private void HandleFinished(bool success, ClientSideStatus receivedStatus) { - var fullStatus = ctx.GetReceivedStatusOnClient(); - - AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null; lock (myLock) { finished = true; - finishedStatus = fullStatus; - - origReadCompletionDelegate = readCompletionDelegate; + finishedStatus = receivedStatus; ReleaseResourcesIfPossible(); } - ProcessLastRead(origReadCompletionDelegate); + var status = receivedStatus.Status; + + if (!success || status.StatusCode != StatusCode.OK) + { + streamingCallFinishedTcs.SetException(new RpcException(status)); + return; + } + + streamingCallFinishedTcs.SetResult(null); } } }
\ No newline at end of file diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 6ca4bbdafc..4d20394644 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -54,30 +54,30 @@ namespace Grpc.Core.Internal readonly Func<TWrite, byte[]> serializer; readonly Func<byte[], TRead> deserializer; + protected readonly GrpcEnvironment environment; protected readonly object myLock = new object(); - protected CallSafeHandle call; + protected INativeCall call; protected bool disposed; protected bool started; - protected bool errorOccured; protected bool cancelRequested; protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null. protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null. - protected bool readingDone; - protected bool halfcloseRequested; - protected bool halfclosed; + protected bool readingDone; // True if last read (i.e. read with null payload) was already received. + protected bool halfcloseRequested; // True if send close have been initiated. protected bool finished; // True if close has been received from the peer. protected bool initialMetadataSent; - protected long streamingWritesCounter; + protected long streamingWritesCounter; // Number of streaming send operations started so far. - public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer) + public AsyncCallBase(Func<TWrite, byte[]> serializer, Func<byte[], TRead> deserializer, GrpcEnvironment environment) { this.serializer = Preconditions.CheckNotNull(serializer); this.deserializer = Preconditions.CheckNotNull(deserializer); + this.environment = Preconditions.CheckNotNull(environment); } /// <summary> @@ -114,7 +114,7 @@ namespace Grpc.Core.Internal } } - protected void InitializeInternal(CallSafeHandle call) + protected void InitializeInternal(INativeCall call) { lock (myLock) { @@ -159,16 +159,6 @@ namespace Grpc.Core.Internal } } - // TODO(jtattermusch): find more fitting name for this method. - /// <summary> - /// Default behavior just completes the read observer, but more sofisticated behavior might be required - /// by subclasses. - /// </summary> - protected virtual void ProcessLastRead(AsyncCompletionDelegate<TRead> completionDelegate) - { - FireCompletion(completionDelegate, default(TRead), null); - } - /// <summary> /// If there are no more pending actions and no new actions can be started, releases /// the underlying native resources. @@ -177,7 +167,7 @@ namespace Grpc.Core.Internal { if (!disposed && call != null) { - bool noMoreSendCompletions = halfclosed || (cancelRequested && sendCompletionDelegate == null); + bool noMoreSendCompletions = sendCompletionDelegate == null && (halfcloseRequested || cancelRequested || finished); if (noMoreSendCompletions && readingDone && finished) { ReleaseResources(); @@ -189,34 +179,33 @@ namespace Grpc.Core.Internal private void ReleaseResources() { - OnReleaseResources(); if (call != null) { call.Dispose(); } disposed = true; + OnAfterReleaseResources(); } - protected virtual void OnReleaseResources() + protected virtual void OnAfterReleaseResources() { } protected void CheckSendingAllowed() { Preconditions.CheckState(started); - Preconditions.CheckState(!errorOccured); CheckNotCancelled(); Preconditions.CheckState(!disposed); Preconditions.CheckState(!halfcloseRequested, "Already halfclosed."); + Preconditions.CheckState(!finished, "Already finished."); Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } - protected void CheckReadingAllowed() + protected virtual void CheckReadingAllowed() { Preconditions.CheckState(started); Preconditions.CheckState(!disposed); - Preconditions.CheckState(!errorOccured); Preconditions.CheckState(!readingDone, "Stream has already been closed."); Preconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time"); @@ -280,7 +269,7 @@ namespace Grpc.Core.Internal /// <summary> /// Handles send completion. /// </summary> - protected void HandleSendFinished(bool success, BatchContextSafeHandle ctx) + protected void HandleSendFinished(bool success) { AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) @@ -304,12 +293,11 @@ namespace Grpc.Core.Internal /// <summary> /// Handles halfclose completion. /// </summary> - protected void HandleHalfclosed(bool success, BatchContextSafeHandle ctx) + protected void HandleHalfclosed(bool success) { AsyncCompletionDelegate<object> origCompletionDelegate = null; lock (myLock) { - halfclosed = true; origCompletionDelegate = sendCompletionDelegate; sendCompletionDelegate = null; @@ -329,23 +317,17 @@ namespace Grpc.Core.Internal /// <summary> /// Handles streaming read completion. /// </summary> - protected void HandleReadFinished(bool success, BatchContextSafeHandle ctx) + protected void HandleReadFinished(bool success, byte[] receivedMessage) { - var payload = ctx.GetReceivedMessage(); - AsyncCompletionDelegate<TRead> origCompletionDelegate = null; lock (myLock) { origCompletionDelegate = readCompletionDelegate; - if (payload != null) - { - readCompletionDelegate = null; - } - else + readCompletionDelegate = null; + + if (receivedMessage == null) { - // This was the last read. Keeping the readCompletionDelegate - // to be either fired by this handler or by client-side finished - // handler. + // This was the last read. readingDone = true; } @@ -354,17 +336,17 @@ namespace Grpc.Core.Internal // TODO: handle the case when error occured... - if (payload != null) + if (receivedMessage != null) { // TODO: handle deserialization error TRead msg; - TryDeserialize(payload, out msg); + TryDeserialize(receivedMessage, out msg); FireCompletion(origCompletionDelegate, msg, null); } else { - ProcessLastRead(origCompletionDelegate); + FireCompletion(origCompletionDelegate, default(TRead), null); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 3710a65d6b..5c47251030 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -49,17 +49,18 @@ namespace Grpc.Core.Internal { readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); - readonly GrpcEnvironment environment; + readonly Server server; - public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer) + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer, environment) { - this.environment = Preconditions.CheckNotNull(environment); + this.server = Preconditions.CheckNotNull(server); } public void Initialize(CallSafeHandle call) { call.SetCompletionRegistry(environment.CompletionRegistry); - environment.DebugStats.ActiveServerCalls.Increment(); + + server.AddCallReference(this); InitializeInternal(call); } @@ -168,18 +169,22 @@ namespace Grpc.Core.Internal } } - protected override void OnReleaseResources() + protected override void CheckReadingAllowed() { - environment.DebugStats.ActiveServerCalls.Decrement(); + base.CheckReadingAllowed(); + Preconditions.CheckArgument(!cancelRequested); + } + + protected override void OnAfterReleaseResources() + { + server.RemoveCallReference(this); } /// <summary> /// Handles the server side close completion. /// </summary> - private void HandleFinishedServerside(bool success, BatchContextSafeHandle ctx) + private void HandleFinishedServerside(bool success, bool cancelled) { - bool cancelled = ctx.GetReceivedCloseOnServerCancelled(); - lock (myLock) { finished = true; diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 6a2add54db..3a96414bea 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -134,7 +134,7 @@ namespace Grpc.Core.Internal } // Gets data of server_rpc_new completion. - public ServerRpcNew GetServerRpcNew() + public ServerRpcNew GetServerRpcNew(Server server) { var call = grpcsharp_batch_context_server_rpc_new_call(this); @@ -145,7 +145,7 @@ namespace Grpc.Core.Internal IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this); var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); - return new ServerRpcNew(call, method, host, deadline, metadata); + return new ServerRpcNew(server, call, method, host, deadline, metadata); } // Gets data of receive_close_on_server completion. @@ -198,14 +198,16 @@ namespace Grpc.Core.Internal /// </summary> internal struct ServerRpcNew { + readonly Server server; readonly CallSafeHandle call; readonly string method; readonly string host; readonly Timespec deadline; readonly Metadata requestMetadata; - public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) + public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) { + this.server = server; this.call = call; this.method = method; this.host = host; @@ -213,6 +215,14 @@ namespace Grpc.Core.Internal this.requestMetadata = requestMetadata; } + public Server Server + { + get + { + return this.server; + } + } + public CallSafeHandle Call { get diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 3cb01e29bd..0f187529e8 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -40,7 +40,7 @@ namespace Grpc.Core.Internal /// <summary> /// grpc_call from <grpc/grpc.h> /// </summary> - internal class CallSafeHandle : SafeHandleZeroIsInvalid + internal class CallSafeHandle : SafeHandleZeroIsInvalid, INativeCall { public static readonly CallSafeHandle NullInstance = new CallSafeHandle(); @@ -87,6 +87,10 @@ namespace Grpc.Core.Internal BatchContextSafeHandle ctx); [DllImport("grpc_csharp_ext.dll")] + static extern GRPCCallError grpcsharp_call_recv_initial_metadata(CallSafeHandle call, + BatchContextSafeHandle ctx); + + [DllImport("grpc_csharp_ext.dll")] static extern GRPCCallError grpcsharp_call_start_serverside(CallSafeHandle call, BatchContextSafeHandle ctx); @@ -109,10 +113,10 @@ namespace Grpc.Core.Internal this.completionRegistry = completionRegistry; } - public void StartUnary(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); grpcsharp_call_start_unary(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags) .CheckOk(); } @@ -123,66 +127,73 @@ namespace Grpc.Core.Internal .CheckOk(); } - public void StartClientStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata())); grpcsharp_call_start_client_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartServerStreaming(BatchCompletionDelegate callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) + public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); grpcsharp_call_start_server_streaming(this, ctx, payload, new UIntPtr((ulong)payload.Length), metadataArray, writeFlags).CheckOk(); } - public void StartDuplexStreaming(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedStatusOnClient())); grpcsharp_call_start_duplex_streaming(this, ctx, metadataArray).CheckOk(); } - public void StartSendMessage(BatchCompletionDelegate callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) + public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_message(this, ctx, payload, new UIntPtr((ulong)payload.Length), writeFlags, sendEmptyInitialMetadata).CheckOk(); } - public void StartSendCloseFromClient(BatchCompletionDelegate callback) + public void StartSendCloseFromClient(SendCompletionHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_close_from_client(this, ctx).CheckOk(); } - public void StartSendStatusFromServer(BatchCompletionDelegate callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) + public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk(); } - public void StartReceiveMessage(BatchCompletionDelegate callback) + public void StartReceiveMessage(ReceivedMessageHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedMessage())); grpcsharp_call_recv_message(this, ctx).CheckOk(); } - public void StartServerSide(BatchCompletionDelegate callback) + public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback) + { + var ctx = BatchContextSafeHandle.Create(); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedInitialMetadata())); + grpcsharp_call_recv_initial_metadata(this, ctx).CheckOk(); + } + + public void StartServerSide(ReceivedCloseOnServerHandler callback) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success, context.GetReceivedCloseOnServerCancelled())); grpcsharp_call_start_serverside(this, ctx).CheckOk(); } - public void StartSendInitialMetadata(BatchCompletionDelegate callback, MetadataArraySafeHandle metadataArray) + public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray) { var ctx = BatchContextSafeHandle.Create(); - completionRegistry.RegisterBatchCompletion(ctx, callback); + completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success)); grpcsharp_call_send_initial_metadata(this, ctx, metadataArray).CheckOk(); } diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 7f03bf4ea5..8cef566c14 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -68,11 +68,17 @@ namespace Grpc.Core.Internal public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_insecure_channel_create(target, channelArgs); } public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_secure_channel_create(credentials, target, channelArgs); } @@ -107,6 +113,7 @@ namespace Grpc.Core.Internal protected override bool ReleaseHandle() { grpcsharp_channel_destroy(handle); + GrpcEnvironment.GrpcNativeShutdown(); return true; } } diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index 6c44521038..b4a7335c7c 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -72,7 +72,13 @@ namespace Grpc.Core.Internal call.StartReadMessage(taskSource.CompletionDelegate); var result = await taskSource.Task; this.current = result; - return result != null; + + if (result == null) + { + await call.StreamingCallFinishedTask; + return false; + } + return true; } public void Dispose() diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs index 8793450ff3..1bea1adf9e 100644 --- a/src/csharp/Grpc.Core/Internal/DebugStats.cs +++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs @@ -38,10 +38,6 @@ namespace Grpc.Core.Internal { internal class DebugStats { - public readonly AtomicCounter ActiveClientCalls = new AtomicCounter(); - - public readonly AtomicCounter ActiveServerCalls = new AtomicCounter(); - public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter(); /// <summary> @@ -49,16 +45,6 @@ namespace Grpc.Core.Internal /// </summary> public void CheckOK() { - var remainingClientCalls = ActiveClientCalls.Count; - if (remainingClientCalls != 0) - { - DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls)); - } - var remainingServerCalls = ActiveServerCalls.Count; - if (remainingServerCalls != 0) - { - DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls)); - } var pendingBatchCompletions = PendingBatchCompletions.Count; if (pendingBatchCompletions != 0) { diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index cb4c7c821e..4b7124ee74 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -83,8 +83,6 @@ namespace Grpc.Core.Internal lock (myLock) { cq.Shutdown(); - - Logger.Info("Waiting for GRPC threads to finish."); foreach (var thread in threads) { thread.Join(); @@ -136,7 +134,6 @@ namespace Grpc.Core.Internal } } while (ev.type != GRPCCompletionType.Shutdown); - Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name); } } } diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs new file mode 100644 index 0000000000..cbef599139 --- /dev/null +++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs @@ -0,0 +1,85 @@ +#region Copyright notice and license +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.Core.Internal +{ + internal delegate void UnaryResponseClientHandler(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders); + + // Received status for streaming response calls. + internal delegate void ReceivedStatusOnClientHandler(bool success, ClientSideStatus receivedStatus); + + internal delegate void ReceivedMessageHandler(bool success, byte[] receivedMessage); + + internal delegate void ReceivedResponseHeadersHandler(bool success, Metadata responseHeaders); + + internal delegate void SendCompletionHandler(bool success); + + internal delegate void ReceivedCloseOnServerHandler(bool success, bool cancelled); + + /// <summary> + /// Abstraction of a native call object. + /// </summary> + internal interface INativeCall : IDisposable + { + void Cancel(); + + void CancelWithStatus(Grpc.Core.Status status); + + string GetPeer(); + + void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags); + + void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags); + + void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray); + + void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, Grpc.Core.WriteFlags writeFlags); + + void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray); + + void StartReceiveMessage(ReceivedMessageHandler callback); + + void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback); + + void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray); + + void StartSendMessage(SendCompletionHandler callback, byte[] payload, Grpc.Core.WriteFlags writeFlags, bool sendEmptyInitialMetadata); + + void StartSendCloseFromClient(SendCompletionHandler callback); + + void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata); + + void StartServerSide(ReceivedCloseOnServerHandler callback); + } +} diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs index 427c16fac6..83994f6762 100644 --- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs @@ -70,7 +70,8 @@ namespace Grpc.Core.Internal var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count)); for (int i = 0; i < metadata.Count; i++) { - grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, metadata[i].ValueBytes, new UIntPtr((ulong)metadata[i].ValueBytes.Length)); + var valueBytes = metadata[i].GetSerializedValueUnsafe(); + grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length)); } return metadataArray; } @@ -94,7 +95,7 @@ namespace Grpc.Core.Internal string key = Marshal.PtrToStringAnsi(grpcsharp_metadata_array_get_key(metadataArray, index)); var bytes = new byte[grpcsharp_metadata_array_get_value_length(metadataArray, index).ToUInt64()]; Marshal.Copy(grpcsharp_metadata_array_get_value(metadataArray, index), bytes, 0, bytes.Length); - metadata.Add(new Metadata.Entry(key, bytes)); + metadata.Add(Metadata.Entry.CreateUnsafe(key, bytes)); } return metadata; } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 688f9f6fec..59f4c5727c 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -67,7 +67,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -123,7 +123,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -179,7 +179,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -239,7 +239,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -278,7 +278,7 @@ namespace Grpc.Core.Internal { // We don't care about the payload type here. var asyncCall = new AsyncCallServer<byte[], byte[]>( - (payload) => payload, (payload) => payload, environment); + (payload) => payload, (payload) => payload, environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index f9b44b1acf..5ee7ac14e8 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -74,6 +74,9 @@ namespace Grpc.Core.Internal public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_server_create(cq, args); } @@ -109,6 +112,7 @@ namespace Grpc.Core.Internal protected override bool ReleaseHandle() { grpcsharp_server_destroy(handle); + GrpcEnvironment.GrpcNativeShutdown(); return true; } diff --git a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs index 382481d871..35561d25d8 100644 --- a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs +++ b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs @@ -51,7 +51,19 @@ namespace Grpc.Core.Logging private ConsoleLogger(Type forType) { this.forType = forType; - this.forTypeString = forType != null ? forType.FullName + " " : ""; + if (forType != null) + { + var namespaceStr = forType.Namespace ?? ""; + if (namespaceStr.Length > 0) + { + namespaceStr += "."; + } + this.forTypeString = namespaceStr + forType.Name + " "; + } + else + { + this.forTypeString = ""; + } } /// <summary> diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index 9db2abf46e..a589b50caa 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -46,6 +46,11 @@ namespace Grpc.Core public sealed class Metadata : IList<Metadata.Entry> { /// <summary> + /// All binary headers should have this suffix. + /// </summary> + public const string BinaryHeaderSuffix = "-bin"; + + /// <summary> /// An read-only instance of metadata containing no entries. /// </summary> public static readonly Metadata Empty = new Metadata().Freeze(); @@ -181,23 +186,49 @@ namespace Grpc.Core private static readonly Encoding Encoding = Encoding.ASCII; readonly string key; - string value; - byte[] valueBytes; + readonly string value; + readonly byte[] valueBytes; + + private Entry(string key, string value, byte[] valueBytes) + { + this.key = key; + this.value = value; + this.valueBytes = valueBytes; + } + /// <summary> + /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct with a binary value. + /// </summary> + /// <param name="key">Metadata key, needs to have suffix indicating a binary valued metadata entry.</param> + /// <param name="valueBytes">Value bytes.</param> public Entry(string key, byte[] valueBytes) { - this.key = Preconditions.CheckNotNull(key, "key"); + this.key = NormalizeKey(key); + Preconditions.CheckArgument(this.key.EndsWith(BinaryHeaderSuffix), + "Key for binary valued metadata entry needs to have suffix indicating binary value."); this.value = null; - this.valueBytes = Preconditions.CheckNotNull(valueBytes, "valueBytes"); + Preconditions.CheckNotNull(valueBytes, "valueBytes"); + this.valueBytes = new byte[valueBytes.Length]; + Buffer.BlockCopy(valueBytes, 0, this.valueBytes, 0, valueBytes.Length); // defensive copy to guarantee immutability } + /// <summary> + /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct holding an ASCII value. + /// </summary> + /// <param name="key">Metadata key, must not use suffix indicating a binary valued metadata entry.</param> + /// <param name="value">Value string. Only ASCII characters are allowed.</param> public Entry(string key, string value) { - this.key = Preconditions.CheckNotNull(key, "key"); + this.key = NormalizeKey(key); + Preconditions.CheckArgument(!this.key.EndsWith(BinaryHeaderSuffix), + "Key for ASCII valued metadata entry cannot have suffix indicating binary value."); this.value = Preconditions.CheckNotNull(value, "value"); this.valueBytes = null; } + /// <summary> + /// Gets the metadata entry key. + /// </summary> public string Key { get @@ -206,33 +237,86 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the binary value of this metadata entry. + /// </summary> public byte[] ValueBytes { get { if (valueBytes == null) { - valueBytes = Encoding.GetBytes(value); + return Encoding.GetBytes(value); } - return valueBytes; + + // defensive copy to guarantee immutability + var bytes = new byte[valueBytes.Length]; + Buffer.BlockCopy(valueBytes, 0, bytes, 0, valueBytes.Length); + return bytes; } } + /// <summary> + /// Gets the string value of this metadata entry. + /// </summary> public string Value { get { - if (value == null) - { - value = Encoding.GetString(valueBytes); - } - return value; + Preconditions.CheckState(!IsBinary, "Cannot access string value of a binary metadata entry"); + return value ?? Encoding.GetString(valueBytes); } } - + + /// <summary> + /// Returns <c>true</c> if this entry is a binary-value entry. + /// </summary> + public bool IsBinary + { + get + { + return value == null; + } + } + + /// <summary> + /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Metadata+Entry"/>. + /// </summary> public override string ToString() { - return string.Format("[Entry: key={0}, value={1}]", Key, Value); + if (IsBinary) + { + return string.Format("[Entry: key={0}, valueBytes={1}]", key, valueBytes); + } + + return string.Format("[Entry: key={0}, value={1}]", key, value); + } + + /// <summary> + /// Gets the serialized value for this entry. For binary metadata entries, this leaks + /// the internal <c>valueBytes</c> byte array and caller must not change contents of it. + /// </summary> + internal byte[] GetSerializedValueUnsafe() + { + return valueBytes ?? Encoding.GetBytes(value); + } + + /// <summary> + /// Creates a binary value or ascii value metadata entry from data received from the native layer. + /// We trust C core to give us well-formed data, so we don't perform any checks or defensive copying. + /// </summary> + internal static Entry CreateUnsafe(string key, byte[] valueBytes) + { + if (key.EndsWith(BinaryHeaderSuffix)) + { + return new Entry(key, null, valueBytes); + } + return new Entry(key, Encoding.GetString(valueBytes), null); + } + + private static string NormalizeKey(string key) + { + return Preconditions.CheckNotNull(key, "key").ToLower(); } } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index c76f126026..28f1686e20 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -50,6 +50,8 @@ namespace Grpc.Core { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); + readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly ServiceDefinitionCollection serviceDefinitions; readonly ServerPortCollection ports; readonly GrpcEnvironment environment; @@ -73,7 +75,7 @@ namespace Grpc.Core { this.serviceDefinitions = new ServiceDefinitionCollection(this); this.ports = new ServerPortCollection(this); - this.environment = GrpcEnvironment.GetInstance(); + this.environment = GrpcEnvironment.AddRef(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { @@ -106,6 +108,17 @@ namespace Grpc.Core } /// <summary> + /// To allow awaiting termination of the server. + /// </summary> + public Task ShutdownTask + { + get + { + return shutdownTcs.Task; + } + } + + /// <summary> /// Starts the server. /// </summary> public void Start() @@ -136,18 +149,9 @@ namespace Grpc.Core handle.ShutdownAndNotify(HandleServerShutdown, environment); await shutdownTcs.Task; - handle.Dispose(); - } + DisposeHandle(); - /// <summary> - /// To allow awaiting termination of the server. - /// </summary> - public Task ShutdownTask - { - get - { - return shutdownTcs.Task; - } + await Task.Run(() => GrpcEnvironment.Release()); } /// <summary> @@ -166,7 +170,22 @@ namespace Grpc.Core handle.ShutdownAndNotify(HandleServerShutdown, environment); handle.CancelAllCalls(); await shutdownTcs.Task; - handle.Dispose(); + DisposeHandle(); + } + + internal void AddCallReference(object call) + { + activeCallCounter.Increment(); + + bool success = false; + handle.DangerousAddRef(ref success); + Preconditions.CheckState(success); + } + + internal void RemoveCallReference(object call) + { + handle.DangerousRelease(); + activeCallCounter.Decrement(); } /// <summary> @@ -227,6 +246,16 @@ namespace Grpc.Core } } + private void DisposeHandle() + { + var activeCallCount = activeCallCounter.Count; + if (activeCallCount > 0) + { + Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount); + } + handle.Dispose(); + } + /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> @@ -254,7 +283,7 @@ namespace Grpc.Core { if (success) { - ServerRpcNew newRpc = ctx.GetServerRpcNew(); + ServerRpcNew newRpc = ctx.GetServerRpcNew(this); // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs index f9839d99f1..abd95cb905 100644 --- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs +++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs @@ -39,23 +39,21 @@ namespace math { public static void Main(string[] args) { - using (Channel channel = new Channel("127.0.0.1", 23456, Credentials.Insecure)) - { - Math.IMathClient client = new Math.MathClient(channel); - MathExamples.DivExample(client); + var channel = new Channel("127.0.0.1", 23456, Credentials.Insecure); + Math.IMathClient client = new Math.MathClient(channel); + MathExamples.DivExample(client); - MathExamples.DivAsyncExample(client).Wait(); + MathExamples.DivAsyncExample(client).Wait(); - MathExamples.FibExample(client).Wait(); + MathExamples.FibExample(client).Wait(); - MathExamples.SumExample(client).Wait(); + MathExamples.SumExample(client).Wait(); - MathExamples.DivManyExample(client).Wait(); + MathExamples.DivManyExample(client).Wait(); - MathExamples.DependendRequestsExample(client).Wait(); - } + MathExamples.DependendRequestsExample(client).Wait(); - GrpcEnvironment.Shutdown(); + channel.ShutdownAsync().Wait(); } } } diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs index 5f7e717b0c..26bef646ec 100644 --- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs +++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs @@ -56,7 +56,6 @@ namespace math Console.ReadKey(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } } } diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index fdef950f09..36c1c947bd 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -68,9 +68,8 @@ namespace math.Tests [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs index 024377e216..80c35fb197 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs @@ -71,10 +71,9 @@ namespace Grpc.HealthCheck.Tests [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index f4b0a1028f..24c22273fb 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -37,13 +37,15 @@ using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; +using Google.Apis.Auth.OAuth2; using Google.ProtocolBuffers; + using grpc.testing; using Grpc.Auth; using Grpc.Core; using Grpc.Core.Utils; + using NUnit.Framework; -using Google.Apis.Auth.OAuth2; namespace Grpc.IntegrationTesting { @@ -118,12 +120,10 @@ namespace Grpc.IntegrationTesting }; } - using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions)) - { - TestService.TestServiceClient client = new TestService.TestServiceClient(channel); - await RunTestCaseAsync(options.testCase, client); - } - GrpcEnvironment.Shutdown(); + var channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions); + TestService.TestServiceClient client = new TestService.TestServiceClient(channel); + await RunTestCaseAsync(options.testCase, client); + channel.ShutdownAsync().Wait(); } private async Task RunTestCaseAsync(string testCase, TestService.TestServiceClient client) @@ -169,6 +169,9 @@ namespace Grpc.IntegrationTesting case "cancel_after_first_response": await RunCancelAfterFirstResponseAsync(client); break; + case "timeout_on_sleeping_server": + await RunTimeoutOnSleepingServerAsync(client); + break; case "benchmark_empty_unary": RunBenchmarkEmptyUnary(client); break; @@ -458,6 +461,29 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } + public static async Task RunTimeoutOnSleepingServerAsync(TestService.ITestServiceClient client) + { + Console.WriteLine("running timeout_on_sleeping_server"); + + var deadline = DateTime.UtcNow.AddMilliseconds(1); + using (var call = client.FullDuplexCall(deadline: deadline)) + { + try + { + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetPayload(CreateZerosPayload(27182)).Build()); + } + catch (InvalidOperationException) + { + // Deadline was reached before write has started. Eat the exception and continue. + } + + var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.DeadlineExceeded, ex.Status.StatusCode); + } + Console.WriteLine("Passed!"); + } + // This is not an official interop test, but it's useful. public static void RunBenchmarkEmptyUnary(TestService.ITestServiceClient client) { diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs index 6fa721bc1c..f3158aeb45 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs @@ -75,9 +75,8 @@ namespace Grpc.IntegrationTesting [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] @@ -127,5 +126,11 @@ namespace Grpc.IntegrationTesting { await InteropClient.RunCancelAfterFirstResponseAsync(client); } + + [Test] + public async Task TimeoutOnSleepingServerAsync() + { + await InteropClient.RunTimeoutOnSleepingServerAsync(client); + } } } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs index 504fd11857..0cc8b2cde1 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs @@ -107,8 +107,6 @@ namespace Grpc.IntegrationTesting server.Start(); server.ShutdownTask.Wait(); - - GrpcEnvironment.Shutdown(); } private static ServerOptions ParseArguments(string[] args) diff --git a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs index 1c398eb84e..842795374f 100644 --- a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs @@ -85,9 +85,8 @@ namespace Grpc.IntegrationTesting [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index fc9470f93f..489e219c49 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -595,7 +595,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, grpc_metadata_array *initial_metadata, gpr_uint32 write_flags) { /* TODO: don't use magic number */ - grpc_op ops[5]; + grpc_op ops[4]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -615,23 +615,18 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming( ops[2].flags = 0; ops[2].reserved = NULL; - ops[3].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[3].data.recv_initial_metadata = &(ctx->recv_initial_metadata); - ops[3].flags = 0; - ops[3].reserved = NULL; - - ops[4].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[4].data.recv_status_on_client.trailing_metadata = + ops[3].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[3].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[4].data.recv_status_on_client.status = + ops[3].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[4].data.recv_status_on_client.status_details = + ops[3].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[4].data.recv_status_on_client.status_details_capacity = + ops[3].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); - ops[4].flags = 0; - ops[4].reserved = NULL; + ops[3].flags = 0; + ops[3].reserved = NULL; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, NULL); @@ -642,7 +637,7 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, grpcsharp_batch_context *ctx, grpc_metadata_array *initial_metadata) { /* TODO: don't use magic number */ - grpc_op ops[3]; + grpc_op ops[2]; ops[0].op = GRPC_OP_SEND_INITIAL_METADATA; grpcsharp_metadata_array_move(&(ctx->send_initial_metadata), initial_metadata); @@ -652,28 +647,36 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call, ops[0].flags = 0; ops[0].reserved = NULL; - ops[1].op = GRPC_OP_RECV_INITIAL_METADATA; - ops[1].data.recv_initial_metadata = &(ctx->recv_initial_metadata); - ops[1].flags = 0; - ops[1].reserved = NULL; - - ops[2].op = GRPC_OP_RECV_STATUS_ON_CLIENT; - ops[2].data.recv_status_on_client.trailing_metadata = + ops[1].op = GRPC_OP_RECV_STATUS_ON_CLIENT; + ops[1].data.recv_status_on_client.trailing_metadata = &(ctx->recv_status_on_client.trailing_metadata); - ops[2].data.recv_status_on_client.status = + ops[1].data.recv_status_on_client.status = &(ctx->recv_status_on_client.status); /* not using preallocation for status_details */ - ops[2].data.recv_status_on_client.status_details = + ops[1].data.recv_status_on_client.status_details = &(ctx->recv_status_on_client.status_details); - ops[2].data.recv_status_on_client.status_details_capacity = + ops[1].data.recv_status_on_client.status_details_capacity = &(ctx->recv_status_on_client.status_details_capacity); - ops[2].flags = 0; - ops[2].reserved = NULL; + ops[1].flags = 0; + ops[1].reserved = NULL; return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, NULL); } +GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_recv_initial_metadata( + grpc_call *call, grpcsharp_batch_context *ctx) { + /* TODO: don't use magic number */ + grpc_op ops[1]; + ops[0].op = GRPC_OP_RECV_INITIAL_METADATA; + ops[0].data.recv_initial_metadata = &(ctx->recv_initial_metadata); + ops[0].flags = 0; + ops[0].reserved = NULL; + + return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx, + NULL); +} + GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx, const char *send_buffer, size_t send_buffer_len, diff --git a/src/node/ext/server_credentials.cc b/src/node/ext/server_credentials.cc index 1b8e7b43fb..6e17197e16 100644 --- a/src/node/ext/server_credentials.cc +++ b/src/node/ext/server_credentials.cc @@ -41,6 +41,7 @@ namespace grpc { namespace node { +using v8::Array; using v8::Exception; using v8::External; using v8::Function; @@ -52,6 +53,7 @@ using v8::Local; using v8::Object; using v8::ObjectTemplate; using v8::Persistent; +using v8::String; using v8::Value; NanCallback *ServerCredentials::constructor; @@ -122,25 +124,66 @@ NAN_METHOD(ServerCredentials::CreateSsl) { // TODO: have the node API support multiple key/cert pairs. NanScope(); char *root_certs = NULL; - grpc_ssl_pem_key_cert_pair key_cert_pair; if (::node::Buffer::HasInstance(args[0])) { root_certs = ::node::Buffer::Data(args[0]); } else if (!(args[0]->IsNull() || args[0]->IsUndefined())) { return NanThrowTypeError( "createSSl's first argument must be a Buffer if provided"); } - if (!::node::Buffer::HasInstance(args[1])) { - return NanThrowTypeError("createSsl's second argument must be a Buffer"); + if (!args[1]->IsArray()) { + return NanThrowTypeError( + "createSsl's second argument must be a list of objects"); + } + int force_client_auth = 0; + if (args[2]->IsBoolean()) { + force_client_auth = (int)args[2]->BooleanValue(); + } else if (!(args[2]->IsUndefined() || args[2]->IsNull())) { + return NanThrowTypeError( + "createSsl's third argument must be a boolean if provided"); } - key_cert_pair.private_key = ::node::Buffer::Data(args[1]); - if (!::node::Buffer::HasInstance(args[2])) { - return NanThrowTypeError("createSsl's third argument must be a Buffer"); + Handle<Array> pair_list = Local<Array>::Cast(args[1]); + uint32_t key_cert_pair_count = pair_list->Length(); + grpc_ssl_pem_key_cert_pair *key_cert_pairs = new grpc_ssl_pem_key_cert_pair[ + key_cert_pair_count]; + + Handle<String> key_key = NanNew("private_key"); + Handle<String> cert_key = NanNew("cert_chain"); + + for(uint32_t i = 0; i < key_cert_pair_count; i++) { + if (!pair_list->Get(i)->IsObject()) { + delete key_cert_pairs; + return NanThrowTypeError("Key/cert pairs must be objects"); + } + Handle<Object> pair_obj = pair_list->Get(i)->ToObject(); + if (!pair_obj->HasOwnProperty(key_key)) { + delete key_cert_pairs; + return NanThrowTypeError( + "Key/cert pairs must have a private_key and a cert_chain"); + } + if (!pair_obj->HasOwnProperty(cert_key)) { + delete key_cert_pairs; + return NanThrowTypeError( + "Key/cert pairs must have a private_key and a cert_chain"); + } + if (!::node::Buffer::HasInstance(pair_obj->Get(key_key))) { + delete key_cert_pairs; + return NanThrowTypeError("private_key must be a Buffer"); + } + if (!::node::Buffer::HasInstance(pair_obj->Get(cert_key))) { + delete key_cert_pairs; + return NanThrowTypeError("cert_chain must be a Buffer"); + } + key_cert_pairs[i].private_key = ::node::Buffer::Data( + pair_obj->Get(key_key)); + key_cert_pairs[i].cert_chain = ::node::Buffer::Data( + pair_obj->Get(cert_key)); } - key_cert_pair.cert_chain = ::node::Buffer::Data(args[2]); - // TODO Add a force_client_auth parameter and pass it as the last parameter - // here. grpc_server_credentials *creds = - grpc_ssl_server_credentials_create(root_certs, &key_cert_pair, 1, 0); + grpc_ssl_server_credentials_create(root_certs, + key_cert_pairs, + key_cert_pair_count, + force_client_auth); + delete key_cert_pairs; if (creds == NULL) { NanReturnNull(); } diff --git a/src/node/health_check/health.js b/src/node/health_check/health.js index 87e00197fe..84d7e0568e 100644 --- a/src/node/health_check/health.js +++ b/src/node/health_check/health.js @@ -45,17 +45,13 @@ function HealthImplementation(statusMap) { this.statusMap = _.clone(statusMap); } -HealthImplementation.prototype.setStatus = function(host, service, status) { - if (!this.statusMap[host]) { - this.statusMap[host] = {}; - } - this.statusMap[host][service] = status; +HealthImplementation.prototype.setStatus = function(service, status) { + this.statusMap[service] = status; }; HealthImplementation.prototype.check = function(call, callback){ - var host = call.request.host; var service = call.request.service; - var status = _.get(this.statusMap, [host, service], null); + var status = _.get(this.statusMap, service, null); if (status === null) { callback({code:grpc.status.NOT_FOUND}); } else { diff --git a/src/node/health_check/health.proto b/src/node/health_check/health.proto index d31df1e0a7..57f4aaa9c0 100644 --- a/src/node/health_check/health.proto +++ b/src/node/health_check/health.proto @@ -32,8 +32,7 @@ syntax = "proto3"; package grpc.health.v1alpha; message HealthCheckRequest { - string host = 1; - string service = 2; + string service = 1; } message HealthCheckResponse { @@ -47,4 +46,4 @@ message HealthCheckResponse { service Health { rpc Check(HealthCheckRequest) returns (HealthCheckResponse); -}
\ No newline at end of file +} diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js index 1242a0f939..99155e9958 100644 --- a/src/node/interop/interop_server.js +++ b/src/node/interop/interop_server.js @@ -169,8 +169,8 @@ function getServer(port, tls) { var key_data = fs.readFileSync(key_path); var pem_data = fs.readFileSync(pem_path); server_creds = grpc.ServerCredentials.createSsl(null, - key_data, - pem_data); + [{private_key: key_data, + cert_chain: pem_data}]); } else { server_creds = grpc.ServerCredentials.createInsecure(); } diff --git a/src/node/src/client.js b/src/node/src/client.js index 48fe0dd3b7..7b7eae51d2 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -280,7 +280,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { } var client_batch = {}; var message = serialize(argument); - message.grpcWriteFlags = options.flags; + if (options) { + message.grpcWriteFlags = options.flags; + } client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; client_batch[grpc.opType.SEND_MESSAGE] = message; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; @@ -416,7 +418,9 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { } var start_batch = {}; var message = serialize(argument); - message.grpcWriteFlags = options.flags; + if (options) { + message.grpcWriteFlags = options.flags; + } start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; start_batch[grpc.opType.SEND_MESSAGE] = message; diff --git a/src/node/test/health_test.js b/src/node/test/health_test.js index be4ef1d251..04959f5f55 100644 --- a/src/node/test/health_test.js +++ b/src/node/test/health_test.js @@ -41,13 +41,9 @@ var grpc = require('../'); describe('Health Checking', function() { var statusMap = { - '': { - '': 'SERVING', - 'grpc.test.TestService': 'NOT_SERVING', - }, - virtual_host: { - 'grpc.test.TestService': 'SERVING' - } + '': 'SERVING', + 'grpc.test.TestServiceNotServing': 'NOT_SERVING', + 'grpc.test.TestServiceServing': 'SERVING' }; var healthServer = new grpc.Server(); healthServer.addProtoService(health.service, @@ -71,15 +67,15 @@ describe('Health Checking', function() { }); }); it('should say that a disabled service is NOT_SERVING', function(done) { - healthClient.check({service: 'grpc.test.TestService'}, + healthClient.check({service: 'grpc.test.TestServiceNotServing'}, function(err, response) { assert.ifError(err); assert.strictEqual(response.status, 'NOT_SERVING'); done(); }); }); - it('should say that a service on another host is SERVING', function(done) { - healthClient.check({host: 'virtual_host', service: 'grpc.test.TestService'}, + it('should say that an enabled service is SERVING', function(done) { + healthClient.check({service: 'grpc.test.TestServiceServing'}, function(err, response) { assert.ifError(err); assert.strictEqual(response.status, 'SERVING'); @@ -93,12 +89,4 @@ describe('Health Checking', function() { done(); }); }); - it('should get NOT_FOUND if the host is not registered', function(done) { - healthClient.check({host: 'wrong_host', service: 'grpc.test.TestService'}, - function(err, response) { - assert(err); - assert.strictEqual(err.code, grpc.status.NOT_FOUND); - done(); - }); - }); }); diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js index 20c9a07ffa..78bac8da29 100644 --- a/src/node/test/server_test.js +++ b/src/node/test/server_test.js @@ -70,7 +70,9 @@ describe('server', function() { var pem_path = path.join(__dirname, '../test/data/server1.pem'); var key_data = fs.readFileSync(key_path); var pem_data = fs.readFileSync(pem_path); - var creds = grpc.ServerCredentials.createSsl(null, key_data, pem_data); + var creds = grpc.ServerCredentials.createSsl(null, + [{private_key: key_data, + cert_chain: pem_data}]); assert.doesNotThrow(function() { port = server.addHttp2Port('0.0.0.0:0', creds); }); diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php index 8b7e67f57c..287621d930 100644 --- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php +++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php @@ -39,6 +39,14 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase { protected static $client; protected static $timeout; + public function testWaitForNotReady() { + $this->assertFalse(self::$client->waitForReady(1)); + } + + public function testWaitForReady() { + $this->assertTrue(self::$client->waitForReady(250000)); + } + public function testSimpleRequest() { $div_arg = new math\DivArgs(); $div_arg->setDividend(7); diff --git a/src/php/tests/generated_code/GeneratedCodeTest.php b/src/php/tests/generated_code/GeneratedCodeTest.php index 1e4742fc80..a1a2ce81db 100755 --- a/src/php/tests/generated_code/GeneratedCodeTest.php +++ b/src/php/tests/generated_code/GeneratedCodeTest.php @@ -35,7 +35,7 @@ require 'AbstractGeneratedCodeTest.php'; class GeneratedCodeTest extends AbstractGeneratedCodeTest { public static function setUpBeforeClass() { - self::$client = new math\MathClient(new Grpc\BaseStub( - getenv('GRPC_TEST_HOST'), [])); + self::$client = new math\MathClient( + getenv('GRPC_TEST_HOST'), []); } } diff --git a/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php b/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php index f8ec1e7da8..68f57d34ad 100644 --- a/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php +++ b/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php @@ -35,13 +35,13 @@ require 'AbstractGeneratedCodeTest.php'; class GeneratedCodeWithCallbackTest extends AbstractGeneratedCodeTest { public static function setUpBeforeClass() { - self::$client = new math\MathClient(new Grpc\BaseStub( + self::$client = new math\MathClient( getenv('GRPC_TEST_HOST'), ['update_metadata' => function($a_hash, $client = array()) { $a_copy = $a_hash; $a_copy['foo'] = ['bar']; return $a_copy; - }])); + }]); } } diff --git a/src/python/grpcio/grpc/framework/core/__init__.py b/src/python/grpcio/grpc/framework/core/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio/grpc/framework/core/_constants.py b/src/python/grpcio/grpc/framework/core/_constants.py new file mode 100644 index 0000000000..d3be3a4c4a --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_constants.py @@ -0,0 +1,59 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Private constants for the package.""" + +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.links import links + +TICKET_SUBSCRIPTION_FOR_BASE_SUBSCRIPTION_KIND = { + base.Subscription.Kind.NONE: links.Ticket.Subscription.NONE, + base.Subscription.Kind.TERMINATION_ONLY: + links.Ticket.Subscription.TERMINATION, + base.Subscription.Kind.FULL: links.Ticket.Subscription.FULL, + } + +# Mapping from abortive operation outcome to ticket termination to be +# sent to the other side of the operation, or None to indicate that no +# ticket should be sent to the other side in the event of such an +# outcome. +ABORTION_OUTCOME_TO_TICKET_TERMINATION = { + base.Outcome.CANCELLED: links.Ticket.Termination.CANCELLATION, + base.Outcome.EXPIRED: links.Ticket.Termination.EXPIRATION, + base.Outcome.LOCAL_SHUTDOWN: links.Ticket.Termination.SHUTDOWN, + base.Outcome.REMOTE_SHUTDOWN: None, + base.Outcome.RECEPTION_FAILURE: links.Ticket.Termination.RECEPTION_FAILURE, + base.Outcome.TRANSMISSION_FAILURE: None, + base.Outcome.LOCAL_FAILURE: links.Ticket.Termination.LOCAL_FAILURE, + base.Outcome.REMOTE_FAILURE: links.Ticket.Termination.REMOTE_FAILURE, +} + +INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Core) internal error! )-:' +TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE = ( + 'Exception calling termination callback!') diff --git a/src/python/grpcio/grpc/framework/core/_context.py b/src/python/grpcio/grpc/framework/core/_context.py new file mode 100644 index 0000000000..24a12b612e --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_context.py @@ -0,0 +1,92 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation context.""" + +import time + +# _interfaces is referenced from specification in this module. +from grpc.framework.core import _interfaces # pylint: disable=unused-import +from grpc.framework.interfaces.base import base + + +class OperationContext(base.OperationContext): + """An implementation of interfaces.OperationContext.""" + + def __init__( + self, lock, termination_manager, transmission_manager, + expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + + def _abort(self, outcome): + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + def outcome(self): + """See base.OperationContext.outcome for specification.""" + with self._lock: + return self._termination_manager.outcome + + def add_termination_callback(self, callback): + """See base.OperationContext.add_termination_callback.""" + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.add_callback(callback) + return None + else: + return self._termination_manager.outcome + + def time_remaining(self): + """See base.OperationContext.time_remaining for specification.""" + with self._lock: + deadline = self._expiration_manager.deadline() + return max(0.0, deadline - time.time()) + + def cancel(self): + """See base.OperationContext.cancel for specification.""" + self._abort(base.Outcome.CANCELLED) + + def fail(self, exception): + """See base.OperationContext.fail for specification.""" + self._abort(base.Outcome.LOCAL_FAILURE) diff --git a/src/python/grpcio/grpc/framework/core/_emission.py b/src/python/grpcio/grpc/framework/core/_emission.py new file mode 100644 index 0000000000..7c702ab2ce --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_emission.py @@ -0,0 +1,97 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for handling emitted values.""" + +from grpc.framework.core import _interfaces +from grpc.framework.interfaces.base import base + + +class EmissionManager(_interfaces.EmissionManager): + """An EmissionManager implementation.""" + + def __init__( + self, lock, termination_manager, transmission_manager, + expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + self._ingestion_manager = None + + self._initial_metadata_seen = False + self._payload_seen = False + self._completion_seen = False + + def set_ingestion_manager(self, ingestion_manager): + """Sets the ingestion manager with which this manager will cooperate. + + Args: + ingestion_manager: The _interfaces.IngestionManager for the operation. + """ + self._ingestion_manager = ingestion_manager + + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + initial_metadata_present = initial_metadata is not None + payload_present = payload is not None + completion_present = completion is not None + allowance_present = allowance is not None + with self._lock: + if self._termination_manager.outcome is None: + if (initial_metadata_present and ( + self._initial_metadata_seen or self._payload_seen or + self._completion_seen) or + payload_present and self._completion_seen or + completion_present and self._completion_seen or + allowance_present and allowance <= 0): + self._termination_manager.abort(base.Outcome.LOCAL_FAILURE) + self._transmission_manager.abort(base.Outcome.LOCAL_FAILURE) + self._expiration_manager.terminate() + else: + self._initial_metadata_seen |= initial_metadata_present + self._payload_seen |= payload_present + self._completion_seen |= completion_present + if completion_present: + self._termination_manager.emission_complete() + self._ingestion_manager.local_emissions_done() + self._transmission_manager.advance( + initial_metadata, payload, completion, allowance) + if allowance_present: + self._ingestion_manager.add_local_allowance(allowance) diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py new file mode 100644 index 0000000000..fb2c532df6 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_end.py @@ -0,0 +1,251 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementation of base.End.""" + +import abc +import enum +import threading +import uuid + +from grpc.framework.core import _operation +from grpc.framework.core import _utilities +from grpc.framework.foundation import callable_util +from grpc.framework.foundation import later +from grpc.framework.foundation import logging_pool +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.links import links +from grpc.framework.interfaces.links import utilities + +_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' + + +class End(base.End, links.Link): + """A bridge between base.End and links.Link. + + Implementations of this interface translate arriving tickets into + calls on application objects implementing base interfaces and + translate calls from application objects implementing base interfaces + into tickets sent to a joined link. + """ + __metaclass__ = abc.ABCMeta + + +class _Cycle(object): + """State for a single start-stop End lifecycle.""" + + def __init__(self, pool): + self.pool = pool + self.grace = False + self.futures = [] + self.operations = {} + self.idle_actions = [] + + +def _abort(operations): + for operation in operations: + operation.abort(base.Outcome.LOCAL_SHUTDOWN) + + +def _cancel_futures(futures): + for future in futures: + futures.cancel() + + +def _future_shutdown(lock, cycle, event): + def in_future(): + with lock: + _abort(cycle.operations.values()) + _cancel_futures(cycle.futures) + pool = cycle.pool + cycle.pool.shutdown(wait=True) + return in_future + + +def _termination_action(lock, stats, operation_id, cycle): + """Constructs the termination action for a single operation. + + Args: + lock: A lock to hold during the termination action. + states: A mapping from base.Outcome values to integers to increment with + the outcome given to the termination action. + operation_id: The operation ID for the termination action. + cycle: A _Cycle value to be updated during the termination action. + + Returns: + A callable that takes an operation outcome as its sole parameter and that + should be used as the termination action for the operation associated + with the given operation ID. + """ + def termination_action(outcome): + with lock: + stats[outcome] += 1 + cycle.operations.pop(operation_id, None) + if not cycle.operations: + for action in cycle.idle_actions: + cycle.pool.submit(action) + cycle.idle_actions = [] + if cycle.grace: + _cancel_futures(cycle.futures) + return termination_action + + +class _End(End): + """An End implementation.""" + + def __init__(self, servicer_package): + """Constructor. + + Args: + servicer_package: A _ServicerPackage for servicing operations or None if + this end will not be used to service operations. + """ + self._lock = threading.Condition() + self._servicer_package = servicer_package + + self._stats = {outcome: 0 for outcome in base.Outcome} + + self._mate = None + + self._cycle = None + + def start(self): + """See base.End.start for specification.""" + with self._lock: + if self._cycle is not None: + raise ValueError('Tried to start a not-stopped End!') + else: + self._cycle = _Cycle(logging_pool.pool(1)) + + def stop(self, grace): + """See base.End.stop for specification.""" + with self._lock: + if self._cycle is None: + event = threading.Event() + event.set() + return event + elif not self._cycle.operations: + event = threading.Event() + self._cycle.pool.submit(event.set) + self._cycle.pool.shutdown(wait=False) + self._cycle = None + return event + else: + self._cycle.grace = True + event = threading.Event() + self._cycle.idle_actions.append(event.set) + if 0 < grace: + future = later.later( + grace, _future_shutdown(self._lock, self._cycle, event)) + self._cycle.futures.append(future) + else: + _abort(self._cycle.operations.values()) + return event + + def operate( + self, group, method, subscription, timeout, initial_metadata=None, + payload=None, completion=None): + """See base.End.operate for specification.""" + operation_id = uuid.uuid4() + with self._lock: + if self._cycle is None or self._cycle.grace: + raise ValueError('Can\'t operate on stopped or stopping End!') + termination_action = _termination_action( + self._lock, self._stats, operation_id, self._cycle) + operation = _operation.invocation_operate( + operation_id, group, method, subscription, timeout, initial_metadata, + payload, completion, self._mate.accept_ticket, termination_action, + self._cycle.pool) + self._cycle.operations[operation_id] = operation + return operation.context, operation.operator + + def operation_stats(self): + """See base.End.operation_stats for specification.""" + with self._lock: + return dict(self._stats) + + def add_idle_action(self, action): + """See base.End.add_idle_action for specification.""" + with self._lock: + if self._cycle is None: + raise ValueError('Can\'t add idle action to stopped End!') + action_with_exceptions_logged = callable_util.with_exceptions_logged( + action, _IDLE_ACTION_EXCEPTION_LOG_MESSAGE) + if self._cycle.operations: + self._cycle.idle_actions.append(action_with_exceptions_logged) + else: + self._cycle.pool.submit(action_with_exceptions_logged) + + def accept_ticket(self, ticket): + """See links.Link.accept_ticket for specification.""" + with self._lock: + if self._cycle is not None and not self._cycle.grace: + operation = self._cycle.operations.get(ticket.operation_id) + if operation is not None: + operation.handle_ticket(ticket) + elif self._servicer_package is not None: + termination_action = _termination_action( + self._lock, self._stats, ticket.operation_id, self._cycle) + operation = _operation.service_operate( + self._servicer_package, ticket, self._mate.accept_ticket, + termination_action, self._cycle.pool) + if operation is not None: + self._cycle.operations[ticket.operation_id] = operation + + def join_link(self, link): + """See links.Link.join_link for specification.""" + with self._lock: + self._mate = utilities.NULL_LINK if link is None else link + + +def serviceless_end_link(): + """Constructs an End usable only for invoking operations. + + Returns: + An End usable for translating operations into ticket exchange. + """ + return _End(None) + + +def serviceful_end_link(servicer, default_timeout, maximum_timeout): + """Constructs an End capable of servicing operations. + + Args: + servicer: An interfaces.Servicer for servicing operations. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + + Returns: + An End capable of servicing the operations requested of it through ticket + exchange. + """ + return _End( + _utilities.ServicerPackage(servicer, default_timeout, maximum_timeout)) diff --git a/src/python/grpcio/grpc/framework/core/_expiration.py b/src/python/grpcio/grpc/framework/core/_expiration.py new file mode 100644 index 0000000000..d94bdf2d2b --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_expiration.py @@ -0,0 +1,152 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation expiration.""" + +import time + +from grpc.framework.core import _interfaces +from grpc.framework.foundation import later +from grpc.framework.interfaces.base import base + + +class _ExpirationManager(_interfaces.ExpirationManager): + """An implementation of _interfaces.ExpirationManager.""" + + def __init__( + self, commencement, timeout, maximum_timeout, lock, termination_manager, + transmission_manager): + """Constructor. + + Args: + commencement: The time in seconds since the epoch at which the operation + began. + timeout: A length of time in seconds to allow for the operation to run. + maximum_timeout: The maximum length of time in seconds to allow for the + operation to run despite what is requested via this object's + change_timout method. + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._commencement = commencement + self._maximum_timeout = maximum_timeout + + self._timeout = timeout + self._deadline = commencement + timeout + self._index = None + self._future = None + + def _expire(self, index): + def expire(): + with self._lock: + if self._future is not None and index == self._index: + self._future = None + self._termination_manager.expire() + self._transmission_manager.abort(base.Outcome.EXPIRED) + return expire + + def start(self): + self._index = 0 + self._future = later.later(self._timeout, self._expire(0)) + + def change_timeout(self, timeout): + if self._future is not None and timeout != self._timeout: + self._future.cancel() + new_timeout = min(timeout, self._maximum_timeout) + new_index = self._index + 1 + self._timeout = new_timeout + self._deadline = self._commencement + new_timeout + self._index = new_index + delay = self._deadline - time.time() + self._future = later.later(delay, self._expire(new_index)) + if new_timeout != timeout: + self._transmission_manager.timeout(new_timeout) + + def deadline(self): + return self._deadline + + def terminate(self): + if self._future: + self._future.cancel() + self._future = None + self._deadline_index = None + + +def invocation_expiration_manager( + timeout, lock, termination_manager, transmission_manager): + """Creates an _interfaces.ExpirationManager appropriate for front-side use. + + Args: + timeout: A length of time in seconds to allow for the operation to run. + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + + Returns: + An _interfaces.ExpirationManager appropriate for invocation-side use. + """ + expiration_manager = _ExpirationManager( + time.time(), timeout, timeout, lock, termination_manager, + transmission_manager) + expiration_manager.start() + return expiration_manager + + +def service_expiration_manager( + timeout, default_timeout, maximum_timeout, lock, termination_manager, + transmission_manager): + """Creates an _interfaces.ExpirationManager appropriate for back-side use. + + Args: + timeout: A length of time in seconds to allow for the operation to run. May + be None in which case default_timeout will be used. + default_timeout: The default length of time in seconds to allow for the + operation to run if the front-side customer has not specified such a value + (or if the value they specified is not yet known). + maximum_timeout: The maximum length of time in seconds to allow for the + operation to run. + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + + Returns: + An _interfaces.ExpirationManager appropriate for service-side use. + """ + expiration_manager = _ExpirationManager( + time.time(), default_timeout if timeout is None else timeout, + maximum_timeout, lock, termination_manager, transmission_manager) + expiration_manager.start() + return expiration_manager diff --git a/src/python/grpcio/grpc/framework/core/_ingestion.py b/src/python/grpcio/grpc/framework/core/_ingestion.py new file mode 100644 index 0000000000..59f7f8adc8 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_ingestion.py @@ -0,0 +1,410 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ingestion during an operation.""" + +import abc +import collections + +from grpc.framework.core import _constants +from grpc.framework.core import _interfaces +from grpc.framework.foundation import abandonment +from grpc.framework.foundation import callable_util +from grpc.framework.interfaces.base import base + +_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!' +_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!' + + +class _SubscriptionCreation(collections.namedtuple( + '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))): + """A sum type for the outcome of ingestion initialization. + + Either subscription will be non-None, remote_error will be True, or abandoned + will be True. + + Attributes: + subscription: A base.Subscription describing the customer's interest in + operation values from the other side. + remote_error: A boolean indicating that the subscription could not be + created due to an error on the remote side of the operation. + abandoned: A boolean indicating that subscription creation was abandoned. + """ + + +class _SubscriptionCreator(object): + """Common specification of subscription-creating behavior.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def create(self, group, method): + """Creates the base.Subscription of the local customer. + + Any exceptions raised by this method should be attributed to and treated as + defects in the customer code called by this method. + + Args: + group: The group identifier of the operation. + method: The method identifier of the operation. + + Returns: + A _SubscriptionCreation describing the result of subscription creation. + """ + raise NotImplementedError() + + +class _ServiceSubscriptionCreator(_SubscriptionCreator): + """A _SubscriptionCreator appropriate for service-side use.""" + + def __init__(self, servicer, operation_context, output_operator): + """Constructor. + + Args: + servicer: The base.Servicer that will service the operation. + operation_context: A base.OperationContext for the operation to be passed + to the customer. + output_operator: A base.Operator for the operation to be passed to the + customer and to be called by the customer to accept operation data + emitted by the customer. + """ + self._servicer = servicer + self._operation_context = operation_context + self._output_operator = output_operator + + def create(self, group, method): + try: + subscription = self._servicer.service( + group, method, self._operation_context, self._output_operator) + except base.NoSuchMethodError: + return _SubscriptionCreation(None, True, False) + except abandonment.Abandoned: + return _SubscriptionCreation(None, False, True) + else: + return _SubscriptionCreation(subscription, False, False) + + +def _wrap(behavior): + def wrapped(*args, **kwargs): + try: + behavior(*args, **kwargs) + except abandonment.Abandoned: + return False + else: + return True + return wrapped + + +class _IngestionManager(_interfaces.IngestionManager): + """An implementation of _interfaces.IngestionManager.""" + + def __init__( + self, lock, pool, subscription, subscription_creator, termination_manager, + transmission_manager, expiration_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + subscription: A base.Subscription describing the customer's interest in + operation values from the other side. May be None if + subscription_creator is not None. + subscription_creator: A _SubscriptionCreator wrapping the portion of + customer code that when called returns the base.Subscription describing + the customer's interest in operation values from the other side. May be + None if subscription is not None. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + """ + self._lock = lock + self._pool = pool + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + + if subscription is None: + self._subscription_creator = subscription_creator + self._wrapped_operator = None + elif subscription.kind is base.Subscription.Kind.FULL: + self._subscription_creator = None + self._wrapped_operator = _wrap(subscription.operator.advance) + else: + # TODO(nathaniel): Support other subscriptions. + raise ValueError('Unsupported subscription "%s"!' % subscription.kind) + self._pending_initial_metadata = None + self._pending_payloads = [] + self._pending_completion = None + self._local_allowance = 1 + # A nonnegative integer or None, with None indicating that the local + # customer is done emitting anyway so there's no need to bother it by + # informing it that the remote customer has granted it further permission to + # emit. + self._remote_allowance = 0 + self._processing = False + + def _abort_internal_only(self): + self._subscription_creator = None + self._wrapped_operator = None + self._pending_initial_metadata = None + self._pending_payloads = None + self._pending_completion = None + + def _abort_and_notify(self, outcome): + self._abort_internal_only() + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + def _operator_next(self): + """Computes the next step for full-subscription ingestion. + + Returns: + An initial_metadata, payload, completion, allowance, continue quintet + indicating what operation values (if any) are available to pass into + customer code and whether or not there is anything immediately + actionable to call customer code to do. + """ + if self._wrapped_operator is None: + return None, None, None, None, False + else: + initial_metadata, payload, completion, allowance, action = [None] * 5 + if self._pending_initial_metadata is not None: + initial_metadata = self._pending_initial_metadata + self._pending_initial_metadata = None + action = True + if self._pending_payloads and 0 < self._local_allowance: + payload = self._pending_payloads.pop(0) + self._local_allowance -= 1 + action = True + if not self._pending_payloads and self._pending_completion is not None: + completion = self._pending_completion + self._pending_completion = None + action = True + if self._remote_allowance is not None and 0 < self._remote_allowance: + allowance = self._remote_allowance + self._remote_allowance = 0 + action = True + return initial_metadata, payload, completion, allowance, bool(action) + + def _operator_process( + self, wrapped_operator, initial_metadata, payload, + completion, allowance): + while True: + advance_outcome = callable_util.call_logging_exceptions( + wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE, + initial_metadata=initial_metadata, payload=payload, + completion=completion, allowance=allowance) + if advance_outcome.exception is None: + if advance_outcome.return_value: + with self._lock: + if self._termination_manager.outcome is not None: + return + if completion is not None: + self._termination_manager.ingestion_complete() + initial_metadata, payload, completion, allowance, moar = ( + self._operator_next()) + if not moar: + self._processing = False + return + else: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + return + else: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + return + + def _operator_post_create(self, subscription): + wrapped_operator = _wrap(subscription.operator.advance) + with self._lock: + if self._termination_manager.outcome is not None: + return + self._wrapped_operator = wrapped_operator + self._subscription_creator = None + metadata, payload, completion, allowance, moar = self._operator_next() + if not moar: + self._processing = False + return + self._operator_process( + wrapped_operator, metadata, payload, completion, allowance) + + def _create(self, subscription_creator, group, name): + outcome = callable_util.call_logging_exceptions( + subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE, + group, name) + if outcome.return_value is None: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + elif outcome.return_value.abandoned: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.LOCAL_FAILURE) + elif outcome.return_value.remote_error: + with self._lock: + if self._termination_manager.outcome is None: + self._abort_and_notify(base.Outcome.REMOTE_FAILURE) + elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL: + self._operator_post_create(outcome.return_value.subscription) + else: + # TODO(nathaniel): Support other subscriptions. + raise ValueError( + 'Unsupported "%s"!' % outcome.return_value.subscription.kind) + + def _store_advance(self, initial_metadata, payload, completion, allowance): + if initial_metadata is not None: + self._pending_initial_metadata = initial_metadata + if payload is not None: + self._pending_payloads.append(payload) + if completion is not None: + self._pending_completion = completion + if allowance is not None and self._remote_allowance is not None: + self._remote_allowance += allowance + + def _operator_advance(self, initial_metadata, payload, completion, allowance): + if self._processing: + self._store_advance(initial_metadata, payload, completion, allowance) + else: + action = False + if initial_metadata is not None: + action = True + if payload is not None: + if 0 < self._local_allowance: + self._local_allowance -= 1 + action = True + else: + self._pending_payloads.append(payload) + payload = False + if completion is not None: + if self._pending_payloads: + self._pending_completion = completion + else: + action = True + if allowance is not None and self._remote_allowance is not None: + allowance += self._remote_allowance + self._remote_allowance = 0 + action = True + if action: + self._pool.submit( + callable_util.with_exceptions_logged( + self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._wrapped_operator, initial_metadata, payload, completion, + allowance) + + def set_group_and_method(self, group, method): + """See _interfaces.IngestionManager.set_group_and_method for spec.""" + if self._subscription_creator is not None and not self._processing: + self._pool.submit( + callable_util.with_exceptions_logged( + self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE), + self._subscription_creator, group, method) + self._processing = True + + def add_local_allowance(self, allowance): + """See _interfaces.IngestionManager.add_local_allowance for spec.""" + if any((self._subscription_creator, self._wrapped_operator,)): + self._local_allowance += allowance + if not self._processing: + initial_metadata, payload, completion, allowance, moar = ( + self._operator_next()) + if moar: + self._pool.submit( + callable_util.with_exceptions_logged( + self._operator_process, + _constants.INTERNAL_ERROR_LOG_MESSAGE), + initial_metadata, payload, completion, allowance) + + def local_emissions_done(self): + self._remote_allowance = None + + def advance(self, initial_metadata, payload, completion, allowance): + """See _interfaces.IngestionManager.advance for specification.""" + if self._subscription_creator is not None: + self._store_advance(initial_metadata, payload, completion, allowance) + elif self._wrapped_operator is not None: + self._operator_advance(initial_metadata, payload, completion, allowance) + + +def invocation_ingestion_manager( + subscription, lock, pool, termination_manager, transmission_manager, + expiration_manager): + """Creates an IngestionManager appropriate for invocation-side use. + + Args: + subscription: A base.Subscription indicating the customer's interest in the + data and results from the service-side of the operation. + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + + Returns: + An IngestionManager appropriate for invocation-side use. + """ + return _IngestionManager( + lock, pool, subscription, None, termination_manager, transmission_manager, + expiration_manager) + + +def service_ingestion_manager( + servicer, operation_context, output_operator, lock, pool, + termination_manager, transmission_manager, expiration_manager): + """Creates an IngestionManager appropriate for service-side use. + + The returned IngestionManager will require its set_group_and_name method to be + called before its advance method may be called. + + Args: + servicer: A base.Servicer for servicing the operation. + operation_context: A base.OperationContext for the operation to be passed to + the customer. + output_operator: A base.Operator for the operation to be passed to the + customer and to be called by the customer to accept operation data output + by the customer. + lock: The operation-wide lock. + pool: A thread pool in which to execute customer code. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + + Returns: + An IngestionManager appropriate for service-side use. + """ + subscription_creator = _ServiceSubscriptionCreator( + servicer, operation_context, output_operator) + return _IngestionManager( + lock, pool, None, subscription_creator, termination_manager, + transmission_manager, expiration_manager) diff --git a/src/python/grpcio/grpc/framework/core/_interfaces.py b/src/python/grpcio/grpc/framework/core/_interfaces.py new file mode 100644 index 0000000000..a626b9f767 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_interfaces.py @@ -0,0 +1,308 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Package-internal interfaces.""" + +import abc + +from grpc.framework.interfaces.base import base + + +class TerminationManager(object): + """An object responsible for handling the termination of an operation. + + Attributes: + outcome: None if the operation is active or a base.Outcome value if it has + terminated. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def add_callback(self, callback): + """Registers a callback to be called on operation termination. + + If the operation has already terminated the callback will not be called. + + Args: + callback: A callable that will be passed an interfaces.Outcome value. + + Returns: + None if the operation has not yet terminated and the passed callback will + be called when it does, or a base.Outcome value describing the operation + termination if the operation has terminated and the callback will not be + called as a result of this method call. + """ + raise NotImplementedError() + + @abc.abstractmethod + def emission_complete(self): + """Indicates that emissions from customer code have completed.""" + raise NotImplementedError() + + @abc.abstractmethod + def transmission_complete(self): + """Indicates that transmissions to the remote end are complete. + + Returns: + True if the operation has terminated or False if the operation remains + ongoing. + """ + raise NotImplementedError() + + @abc.abstractmethod + def reception_complete(self): + """Indicates that reception from the other side is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def ingestion_complete(self): + """Indicates that customer code ingestion of received values is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def expire(self): + """Indicates that the operation must abort because it has taken too long.""" + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, outcome): + """Indicates that the operation must abort for the indicated reason. + + Args: + outcome: An interfaces.Outcome indicating operation abortion. + """ + raise NotImplementedError() + + +class TransmissionManager(object): + """A manager responsible for transmitting to the other end of an operation.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def kick_off( + self, group, method, timeout, initial_metadata, payload, completion, + allowance): + """Transmits the values associated with operation invocation.""" + raise NotImplementedError() + + @abc.abstractmethod + def advance(self, initial_metadata, payload, completion, allowance): + """Accepts values for transmission to the other end of the operation. + + Args: + initial_metadata: An initial metadata value to be transmitted to the other + side of the operation. May only ever be non-None once. + payload: A payload value. + completion: A base.Completion value. May only ever be non-None in the last + transmission to be made to the other side. + allowance: A positive integer communicating the number of additional + payloads allowed to be transmitted from the other side to this side of + the operation, or None if no additional allowance is being granted in + this call. + """ + raise NotImplementedError() + + @abc.abstractmethod + def timeout(self, timeout): + """Accepts for transmission to the other side a new timeout value. + + Args: + timeout: A positive float used as the new timeout value for the operation + to be transmitted to the other side. + """ + raise NotImplementedError() + + @abc.abstractmethod + def allowance(self, allowance): + """Indicates to this manager that the remote customer is allowing payloads. + + Args: + allowance: A positive integer indicating the number of additional payloads + the remote customer is allowing to be transmitted from this side of the + operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def remote_complete(self): + """Indicates to this manager that data from the remote side is complete.""" + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, outcome): + """Indicates that the operation has aborted. + + Args: + outcome: An interfaces.Outcome for the operation. If None, indicates that + the operation abortion should not be communicated to the other side of + the operation. + """ + raise NotImplementedError() + + +class ExpirationManager(object): + """A manager responsible for aborting the operation if it runs out of time.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def change_timeout(self, timeout): + """Changes the timeout allotted for the operation. + + Operation duration is always measure from the beginning of the operation; + calling this method changes the operation's allotted time to timeout total + seconds, not timeout seconds from the time of this method call. + + Args: + timeout: A length of time in seconds to allow for the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deadline(self): + """Returns the time until which the operation is allowed to run. + + Returns: + The time (seconds since the epoch) at which the operation will expire. + """ + raise NotImplementedError() + + @abc.abstractmethod + def terminate(self): + """Indicates to this manager that the operation has terminated.""" + raise NotImplementedError() + + +class EmissionManager(base.Operator): + """A manager of values emitted by customer code.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def advance( + self, initial_metadata=None, payload=None, completion=None, + allowance=None): + """Accepts a value emitted by customer code. + + This method should only be called by customer code. + + Args: + initial_metadata: An initial metadata value emitted by the local customer + to be sent to the other side of the operation. + payload: A payload value emitted by the local customer to be sent to the + other side of the operation. + completion: A Completion value emitted by the local customer to be sent to + the other side of the operation. + allowance: A positive integer indicating an additional number of payloads + that the local customer is willing to accept from the other side of the + operation. + """ + raise NotImplementedError() + + +class IngestionManager(object): + """A manager responsible for executing customer code. + + This name of this manager comes from its responsibility to pass successive + values from the other side of the operation into the code of the local + customer. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_group_and_method(self, group, method): + """Communicates to this IngestionManager the operation group and method. + + Args: + group: The group identifier of the operation. + method: The method identifier of the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def add_local_allowance(self, allowance): + """Communicates to this IngestionManager that more payloads may be ingested. + + Args: + allowance: A positive integer indicating an additional number of payloads + that the local customer is willing to ingest. + """ + raise NotImplementedError() + + @abc.abstractmethod + def local_emissions_done(self): + """Indicates to this manager that local emissions are done.""" + raise NotImplementedError() + + @abc.abstractmethod + def advance(self, initial_metadata, payload, completion, allowance): + """Advances the operation by passing values to the local customer.""" + raise NotImplementedError() + + +class ReceptionManager(object): + """A manager responsible for receiving tickets from the other end.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def receive_ticket(self, ticket): + """Handle a ticket from the other side of the operation. + + Args: + ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket + appropriate to this end of the operation and this object. + """ + raise NotImplementedError() + + +class Operation(object): + """An ongoing operation. + + Attributes: + context: A base.OperationContext object for the operation. + operator: A base.Operator object for the operation for use by the customer + of the operation. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def handle_ticket(self, ticket): + """Handle a ticket from the other side of the operation. + + Args: + ticket: A links.Ticket from the other side of the operation. + """ + raise NotImplementedError() + + @abc.abstractmethod + def abort(self, outcome): + """Aborts the operation. + + Args: + outcome: A base.Outcome value indicating operation abortion. + """ + raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/core/_operation.py b/src/python/grpcio/grpc/framework/core/_operation.py new file mode 100644 index 0000000000..d20e40a53d --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_operation.py @@ -0,0 +1,192 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementation of operations.""" + +import threading + +# _utilities is referenced from specification in this module. +from grpc.framework.core import _context +from grpc.framework.core import _emission +from grpc.framework.core import _expiration +from grpc.framework.core import _ingestion +from grpc.framework.core import _interfaces +from grpc.framework.core import _reception +from grpc.framework.core import _termination +from grpc.framework.core import _transmission +from grpc.framework.core import _utilities # pylint: disable=unused-import + + +class _EasyOperation(_interfaces.Operation): + """A trivial implementation of interfaces.Operation.""" + + def __init__( + self, lock, termination_manager, transmission_manager, expiration_manager, + context, operator, reception_manager): + """Constructor. + + Args: + lock: The operation-wide lock. + termination_manager: The _interfaces.TerminationManager for the operation. + transmission_manager: The _interfaces.TransmissionManager for the + operation. + expiration_manager: The _interfaces.ExpirationManager for the operation. + context: A base.OperationContext for use by the customer during the + operation. + operator: A base.Operator for use by the customer during the operation. + reception_manager: The _interfaces.ReceptionManager for the operation. + """ + self._lock = lock + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + self._reception_manager = reception_manager + + self.context = context + self.operator = operator + + def handle_ticket(self, ticket): + with self._lock: + self._reception_manager.receive_ticket(ticket) + + def abort(self, outcome): + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + +def invocation_operate( + operation_id, group, method, subscription, timeout, initial_metadata, + payload, completion, ticket_sink, termination_action, pool): + """Constructs objects necessary for front-side operation management. + + Args: + operation_id: An object identifying the operation. + group: The group identifier of the operation. + method: The method identifier of the operation. + subscription: A base.Subscription describing the customer's interest in the + results of the operation. + timeout: A length of time in seconds to allow for the operation. + initial_metadata: An initial metadata value to be sent to the other side of + the operation. May be None if the initial metadata will be passed later or + if there will be no initial metadata passed at all. + payload: The first payload value to be transmitted to the other side. May be + None if there is no such value or if the customer chose not to pass it at + operation invocation. + completion: A base.Completion value indicating the end of values passed to + the other side of the operation. + ticket_sink: A callable that accepts links.Tickets and delivers them to the + other side of the operation. + termination_action: A callable that accepts the outcome of the operation as + a base.Outcome value to be called on operation completion. + pool: A thread pool with which to do the work of the operation. + + Returns: + An _interfaces.Operation for the operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.invocation_termination_manager( + termination_action, pool) + transmission_manager = _transmission.TransmissionManager( + operation_id, ticket_sink, lock, pool, termination_manager) + expiration_manager = _expiration.invocation_expiration_manager( + timeout, lock, termination_manager, transmission_manager) + operation_context = _context.OperationContext( + lock, termination_manager, transmission_manager, expiration_manager) + emission_manager = _emission.EmissionManager( + lock, termination_manager, transmission_manager, expiration_manager) + ingestion_manager = _ingestion.invocation_ingestion_manager( + subscription, lock, pool, termination_manager, transmission_manager, + expiration_manager) + reception_manager = _reception.ReceptionManager( + termination_manager, transmission_manager, expiration_manager, + ingestion_manager) + + termination_manager.set_expiration_manager(expiration_manager) + transmission_manager.set_expiration_manager(expiration_manager) + emission_manager.set_ingestion_manager(ingestion_manager) + + transmission_manager.kick_off( + group, method, timeout, initial_metadata, payload, completion, None) + + return _EasyOperation( + lock, termination_manager, transmission_manager, expiration_manager, + operation_context, emission_manager, reception_manager) + + +def service_operate( + servicer_package, ticket, ticket_sink, termination_action, pool): + """Constructs an Operation for service of an operation. + + Args: + servicer_package: A _utilities.ServicerPackage to be used servicing the + operation. + ticket: The first links.Ticket received for the operation. + ticket_sink: A callable that accepts links.Tickets and delivers them to the + other side of the operation. + termination_action: A callable that accepts the outcome of the operation as + a base.Outcome value to be called on operation completion. + pool: A thread pool with which to do the work of the operation. + + Returns: + An _interfaces.Operation for the operation. + """ + lock = threading.Lock() + with lock: + termination_manager = _termination.service_termination_manager( + termination_action, pool) + transmission_manager = _transmission.TransmissionManager( + ticket.operation_id, ticket_sink, lock, pool, termination_manager) + expiration_manager = _expiration.service_expiration_manager( + ticket.timeout, servicer_package.default_timeout, + servicer_package.maximum_timeout, lock, termination_manager, + transmission_manager) + operation_context = _context.OperationContext( + lock, termination_manager, transmission_manager, expiration_manager) + emission_manager = _emission.EmissionManager( + lock, termination_manager, transmission_manager, expiration_manager) + ingestion_manager = _ingestion.service_ingestion_manager( + servicer_package.servicer, operation_context, emission_manager, lock, + pool, termination_manager, transmission_manager, expiration_manager) + reception_manager = _reception.ReceptionManager( + termination_manager, transmission_manager, expiration_manager, + ingestion_manager) + + termination_manager.set_expiration_manager(expiration_manager) + transmission_manager.set_expiration_manager(expiration_manager) + emission_manager.set_ingestion_manager(ingestion_manager) + + reception_manager.receive_ticket(ticket) + + return _EasyOperation( + lock, termination_manager, transmission_manager, expiration_manager, + operation_context, emission_manager, reception_manager) diff --git a/src/python/grpcio/grpc/framework/core/_reception.py b/src/python/grpcio/grpc/framework/core/_reception.py new file mode 100644 index 0000000000..b64faf8146 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_reception.py @@ -0,0 +1,137 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ticket reception.""" + +from grpc.framework.core import _interfaces +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.base import utilities +from grpc.framework.interfaces.links import links + +_REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME = { + links.Ticket.Termination.CANCELLATION: base.Outcome.CANCELLED, + links.Ticket.Termination.EXPIRATION: base.Outcome.EXPIRED, + links.Ticket.Termination.SHUTDOWN: base.Outcome.REMOTE_SHUTDOWN, + links.Ticket.Termination.RECEPTION_FAILURE: base.Outcome.RECEPTION_FAILURE, + links.Ticket.Termination.TRANSMISSION_FAILURE: + base.Outcome.TRANSMISSION_FAILURE, + links.Ticket.Termination.LOCAL_FAILURE: base.Outcome.REMOTE_FAILURE, +} + + +class ReceptionManager(_interfaces.ReceptionManager): + """A ReceptionManager based around a _Receiver passed to it.""" + + def __init__( + self, termination_manager, transmission_manager, expiration_manager, + ingestion_manager): + """Constructor. + + Args: + termination_manager: The operation's _interfaces.TerminationManager. + transmission_manager: The operation's _interfaces.TransmissionManager. + expiration_manager: The operation's _interfaces.ExpirationManager. + ingestion_manager: The operation's _interfaces.IngestionManager. + """ + self._termination_manager = termination_manager + self._transmission_manager = transmission_manager + self._expiration_manager = expiration_manager + self._ingestion_manager = ingestion_manager + + self._lowest_unseen_sequence_number = 0 + self._out_of_sequence_tickets = {} + self._aborted = False + + def _abort(self, outcome): + self._aborted = True + self._termination_manager.abort(outcome) + self._transmission_manager.abort(outcome) + self._expiration_manager.terminate() + + def _sequence_failure(self, ticket): + """Determines a just-arrived ticket's sequential legitimacy. + + Args: + ticket: A just-arrived ticket. + + Returns: + True if the ticket is sequentially legitimate; False otherwise. + """ + if ticket.sequence_number < self._lowest_unseen_sequence_number: + return True + elif ticket.sequence_number in self._out_of_sequence_tickets: + return True + else: + return False + + def _process_one(self, ticket): + if ticket.sequence_number == 0: + self._ingestion_manager.set_group_and_method(ticket.group, ticket.method) + if ticket.timeout is not None: + self._expiration_manager.change_timeout(ticket.timeout) + if ticket.termination is None: + completion = None + else: + completion = utilities.completion( + ticket.terminal_metadata, ticket.code, ticket.message) + self._ingestion_manager.advance( + ticket.initial_metadata, ticket.payload, completion, ticket.allowance) + if ticket.allowance is not None: + self._transmission_manager.allowance(ticket.allowance) + + def _process(self, ticket): + """Process those tickets ready to be processed. + + Args: + ticket: A just-arrived ticket the sequence number of which matches this + _ReceptionManager's _lowest_unseen_sequence_number field. + """ + while True: + self._process_one(ticket) + next_ticket = self._out_of_sequence_tickets.pop( + ticket.sequence_number + 1, None) + if next_ticket is None: + self._lowest_unseen_sequence_number = ticket.sequence_number + 1 + return + else: + ticket = next_ticket + + def receive_ticket(self, ticket): + """See _interfaces.ReceptionManager.receive_ticket for specification.""" + if self._aborted: + return + elif self._sequence_failure(ticket): + self._abort(base.Outcome.RECEPTION_FAILURE) + elif ticket.termination not in (None, links.Ticket.Termination.COMPLETION): + outcome = _REMOTE_TICKET_TERMINATION_TO_LOCAL_OUTCOME[ticket.termination] + self._abort(outcome) + elif ticket.sequence_number == self._lowest_unseen_sequence_number: + self._process(ticket) + else: + self._out_of_sequence_tickets[ticket.sequence_number] = ticket diff --git a/src/python/grpcio/grpc/framework/core/_termination.py b/src/python/grpcio/grpc/framework/core/_termination.py new file mode 100644 index 0000000000..ad9f6123d8 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_termination.py @@ -0,0 +1,212 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for operation termination.""" + +import abc + +from grpc.framework.core import _constants +from grpc.framework.core import _interfaces +from grpc.framework.foundation import callable_util +from grpc.framework.interfaces.base import base + + +def _invocation_completion_predicate( + unused_emission_complete, unused_transmission_complete, + unused_reception_complete, ingestion_complete): + return ingestion_complete + + +def _service_completion_predicate( + unused_emission_complete, transmission_complete, unused_reception_complete, + unused_ingestion_complete): + return transmission_complete + + +class TerminationManager(_interfaces.TerminationManager): + """A _interfaces.TransmissionManager on which another manager may be set.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def set_expiration_manager(self, expiration_manager): + """Sets the expiration manager with which this manager will interact. + + Args: + expiration_manager: The _interfaces.ExpirationManager associated with the + current operation. + """ + raise NotImplementedError() + + +class _TerminationManager(TerminationManager): + """An implementation of TerminationManager.""" + + def __init__(self, predicate, action, pool): + """Constructor. + + Args: + predicate: One of _invocation_completion_predicate or + _service_completion_predicate to be used to determine when the operation + has completed. + action: A behavior to pass the operation outcome on operation termination. + pool: A thread pool. + """ + self._predicate = predicate + self._action = action + self._pool = pool + self._expiration_manager = None + + self.outcome = None + self._callbacks = [] + + self._emission_complete = False + self._transmission_complete = False + self._reception_complete = False + self._ingestion_complete = False + + def set_expiration_manager(self, expiration_manager): + self._expiration_manager = expiration_manager + + def _terminate_internal_only(self, outcome): + """Terminates the operation. + + Args: + outcome: A base.Outcome describing the outcome of the operation. + """ + self.outcome = outcome + callbacks = list(self._callbacks) + self._callbacks = None + + act = callable_util.with_exceptions_logged( + self._action, _constants.INTERNAL_ERROR_LOG_MESSAGE) + + if outcome is base.Outcome.LOCAL_FAILURE: + self._pool.submit(act, outcome) + else: + def call_callbacks_and_act(callbacks, outcome): + for callback in callbacks: + callback_outcome = callable_util.call_logging_exceptions( + callback, _constants.TERMINATION_CALLBACK_EXCEPTION_LOG_MESSAGE, + outcome) + if callback_outcome.exception is not None: + outcome = base.Outcome.LOCAL_FAILURE + break + act(outcome) + + self._pool.submit( + callable_util.with_exceptions_logged( + call_callbacks_and_act, _constants.INTERNAL_ERROR_LOG_MESSAGE), + callbacks, outcome) + + def _terminate_and_notify(self, outcome): + self._terminate_internal_only(outcome) + self._expiration_manager.terminate() + + def _perhaps_complete(self): + if self._predicate( + self._emission_complete, self._transmission_complete, + self._reception_complete, self._ingestion_complete): + self._terminate_and_notify(base.Outcome.COMPLETED) + return True + else: + return False + + def is_active(self): + """See _interfaces.TerminationManager.is_active for specification.""" + return self.outcome is None + + def add_callback(self, callback): + """See _interfaces.TerminationManager.add_callback for specification.""" + if self.outcome is None: + self._callbacks.append(callback) + return None + else: + return self.outcome + + def emission_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._emission_complete = True + self._perhaps_complete() + + def transmission_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._transmission_complete = True + return self._perhaps_complete() + else: + return False + + def reception_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._reception_complete = True + self._perhaps_complete() + + def ingestion_complete(self): + """See superclass method for specification.""" + if self.outcome is None: + self._ingestion_complete = True + self._perhaps_complete() + + def expire(self): + """See _interfaces.TerminationManager.expire for specification.""" + self._terminate_internal_only(base.Outcome.EXPIRED) + + def abort(self, outcome): + """See _interfaces.TerminationManager.abort for specification.""" + self._terminate_and_notify(outcome) + + +def invocation_termination_manager(action, pool): + """Creates a TerminationManager appropriate for invocation-side use. + + Args: + action: An action to call on operation termination. + pool: A thread pool in which to execute the passed action and any + termination callbacks that are registered during the operation. + + Returns: + A TerminationManager appropriate for invocation-side use. + """ + return _TerminationManager(_invocation_completion_predicate, action, pool) + + +def service_termination_manager(action, pool): + """Creates a TerminationManager appropriate for service-side use. + + Args: + action: An action to call on operation termination. + pool: A thread pool in which to execute the passed action and any + termination callbacks that are registered during the operation. + + Returns: + A TerminationManager appropriate for service-side use. + """ + return _TerminationManager(_service_completion_predicate, action, pool) diff --git a/src/python/grpcio/grpc/framework/core/_transmission.py b/src/python/grpcio/grpc/framework/core/_transmission.py new file mode 100644 index 0000000000..01894d398d --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_transmission.py @@ -0,0 +1,294 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""State and behavior for ticket transmission during an operation.""" + +from grpc.framework.core import _constants +from grpc.framework.core import _interfaces +from grpc.framework.foundation import callable_util +from grpc.framework.interfaces.base import base +from grpc.framework.interfaces.links import links + +_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' + + +def _explode_completion(completion): + if completion is None: + return None, None, None, None + else: + return ( + completion.terminal_metadata, completion.code, completion.message, + links.Ticket.Termination.COMPLETION) + + +class TransmissionManager(_interfaces.TransmissionManager): + """An _interfaces.TransmissionManager that sends links.Tickets.""" + + def __init__( + self, operation_id, ticket_sink, lock, pool, termination_manager): + """Constructor. + + Args: + operation_id: The operation's ID. + ticket_sink: A callable that accepts tickets and sends them to the other + side of the operation. + lock: The operation-servicing-wide lock object. + pool: A thread pool in which the work of transmitting tickets will be + performed. + termination_manager: The _interfaces.TerminationManager associated with + this operation. + """ + self._lock = lock + self._pool = pool + self._ticket_sink = ticket_sink + self._operation_id = operation_id + self._termination_manager = termination_manager + self._expiration_manager = None + + self._lowest_unused_sequence_number = 0 + self._remote_allowance = 1 + self._remote_complete = False + self._timeout = None + self._local_allowance = 0 + self._initial_metadata = None + self._payloads = [] + self._completion = None + self._aborted = False + self._abortion_outcome = None + self._transmitting = False + + def set_expiration_manager(self, expiration_manager): + """Sets the ExpirationManager with which this manager will cooperate.""" + self._expiration_manager = expiration_manager + + def _next_ticket(self): + """Creates the next ticket to be transmitted. + + Returns: + A links.Ticket to be sent to the other side of the operation or None if + there is nothing to be sent at this time. + """ + if self._aborted: + if self._abortion_outcome is None: + return None + else: + termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ + self._abortion_outcome] + if termination is None: + return None + else: + self._abortion_outcome = None + return links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, + None, None, None, None, None, None, None, None, None, + termination) + + action = False + # TODO(nathaniel): Support other subscriptions. + local_subscription = links.Ticket.Subscription.FULL + timeout = self._timeout + if timeout is not None: + self._timeout = None + action = True + if self._local_allowance <= 0: + allowance = None + else: + allowance = self._local_allowance + self._local_allowance = 0 + action = True + initial_metadata = self._initial_metadata + if initial_metadata is not None: + self._initial_metadata = None + action = True + if not self._payloads or self._remote_allowance <= 0: + payload = None + else: + payload = self._payloads.pop(0) + self._remote_allowance -= 1 + action = True + if self._completion is None or self._payloads: + terminal_metadata, code, message, termination = None, None, None, None + else: + terminal_metadata, code, message, termination = _explode_completion( + self._completion) + self._completion = None + action = True + + if action: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + local_subscription, timeout, allowance, initial_metadata, payload, + terminal_metadata, code, message, termination) + self._lowest_unused_sequence_number += 1 + return ticket + else: + return None + + def _transmit(self, ticket): + """Commences the transmission loop sending tickets. + + Args: + ticket: A links.Ticket to be sent to the other side of the operation. + """ + def transmit(ticket): + while True: + transmission_outcome = callable_util.call_logging_exceptions( + self._ticket_sink, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket) + if transmission_outcome.exception is None: + with self._lock: + if ticket.termination is links.Ticket.Termination.COMPLETION: + self._termination_manager.transmission_complete() + ticket = self._next_ticket() + if ticket is None: + self._transmitting = False + return + else: + with self._lock: + if self._termination_manager.outcome is None: + self._termination_manager.abort(base.Outcome.TRANSMISSION_FAILURE) + self._expiration_manager.terminate() + return + + self._pool.submit(callable_util.with_exceptions_logged( + transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket) + self._transmitting = True + + def kick_off( + self, group, method, timeout, initial_metadata, payload, completion, + allowance): + """See _interfaces.TransmissionManager.kickoff for specification.""" + # TODO(nathaniel): Support other subscriptions. + subscription = links.Ticket.Subscription.FULL + terminal_metadata, code, message, termination = _explode_completion( + completion) + self._remote_allowance = 1 if payload is None else 0 + ticket = links.Ticket( + self._operation_id, 0, group, method, subscription, timeout, allowance, + initial_metadata, payload, terminal_metadata, code, message, + termination) + self._lowest_unused_sequence_number = 1 + self._transmit(ticket) + + def advance(self, initial_metadata, payload, completion, allowance): + """See _interfaces.TransmissionManager.advance for specification.""" + effective_initial_metadata = initial_metadata + effective_payload = payload + effective_completion = completion + if allowance is not None and not self._remote_complete: + effective_allowance = allowance + else: + effective_allowance = None + if self._transmitting: + if effective_initial_metadata is not None: + self._initial_metadata = effective_initial_metadata + if effective_payload is not None: + self._payloads.append(effective_payload) + if effective_completion is not None: + self._completion = effective_completion + if effective_allowance is not None: + self._local_allowance += effective_allowance + else: + if effective_payload is not None: + if 0 < self._remote_allowance: + ticket_payload = effective_payload + self._remote_allowance -= 1 + else: + self._payloads.append(effective_payload) + ticket_payload = None + else: + ticket_payload = None + if effective_completion is not None and not self._payloads: + ticket_completion = effective_completion + else: + self._completion = effective_completion + ticket_completion = None + if any( + (effective_initial_metadata, ticket_payload, ticket_completion, + effective_allowance)): + terminal_metadata, code, message, termination = _explode_completion( + completion) + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, None, allowance, effective_initial_metadata, ticket_payload, + terminal_metadata, code, message, termination) + self._lowest_unused_sequence_number += 1 + self._transmit(ticket) + + def timeout(self, timeout): + """See _interfaces.TransmissionManager.timeout for specification.""" + if self._transmitting: + self._timeout = timeout + else: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, timeout, None, None, None, None, None, None, None) + self._lowest_unused_sequence_number += 1 + self._transmit(ticket) + + def allowance(self, allowance): + """See _interfaces.TransmissionManager.allowance for specification.""" + if self._transmitting or not self._payloads: + self._remote_allowance += allowance + else: + self._remote_allowance += allowance - 1 + payload = self._payloads.pop(0) + if self._payloads: + completion = None + else: + completion = self._completion + self._completion = None + terminal_metadata, code, message, termination = _explode_completion( + completion) + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, None, + None, None, None, None, payload, terminal_metadata, code, message, + termination) + self._lowest_unused_sequence_number += 1 + self._transmit(ticket) + + def remote_complete(self): + """See _interfaces.TransmissionManager.remote_complete for specification.""" + self._remote_complete = True + self._local_allowance = 0 + + def abort(self, outcome): + """See _interfaces.TransmissionManager.abort for specification.""" + if self._transmitting: + self._aborted, self._abortion_outcome = True, outcome + else: + self._aborted = True + if outcome is not None: + termination = _constants.ABORTION_OUTCOME_TO_TICKET_TERMINATION[ + outcome] + if termination is not None: + ticket = links.Ticket( + self._operation_id, self._lowest_unused_sequence_number, None, + None, None, None, None, None, None, None, None, None, + termination) + self._transmit(ticket) diff --git a/src/python/grpcio/grpc/framework/core/_utilities.py b/src/python/grpcio/grpc/framework/core/_utilities.py new file mode 100644 index 0000000000..5b0d798751 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/_utilities.py @@ -0,0 +1,46 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Package-internal utilities.""" + +import collections + + +class ServicerPackage( + collections.namedtuple( + 'ServicerPackage', ('servicer', 'default_timeout', 'maximum_timeout'))): + """A trivial bundle class. + + Attributes: + servicer: A base.Servicer. + default_timeout: A float indicating the length of time in seconds to allow + for an operation invoked without a timeout. + maximum_timeout: A float indicating the maximum length of time in seconds to + allow for an operation. + """ diff --git a/src/python/grpcio/grpc/framework/core/implementations.py b/src/python/grpcio/grpc/framework/core/implementations.py new file mode 100644 index 0000000000..364a7faed4 --- /dev/null +++ b/src/python/grpcio/grpc/framework/core/implementations.py @@ -0,0 +1,62 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Entry points into the ticket-exchange-based base layer implementation.""" + +# base and links are referenced from specification in this module. +from grpc.framework.core import _end +from grpc.framework.interfaces.base import base # pylint: disable=unused-import +from grpc.framework.interfaces.links import links # pylint: disable=unused-import + + +def invocation_end_link(): + """Creates a base.End-links.Link suitable for operation invocation. + + Returns: + An object that is both a base.End and a links.Link, that supports operation + invocation, and that translates operation invocation into ticket exchange. + """ + return _end.serviceless_end_link() + + +def service_end_link(servicer, default_timeout, maximum_timeout): + """Creates a base.End-links.Link suitable for operation service. + + Args: + servicer: A base.Servicer for servicing operations. + default_timeout: A length of time in seconds to be used as the default + time alloted for a single operation. + maximum_timeout: A length of time in seconds to be used as the maximum + time alloted for a single operation. + + Returns: + An object that is both a base.End and a links.Link and that services + operations that arrive at it through ticket exchange. + """ + return _end.serviceful_end_link(servicer, default_timeout, maximum_timeout) diff --git a/src/python/grpcio/grpc/framework/interfaces/base/base.py b/src/python/grpcio/grpc/framework/interfaces/base/base.py index 9d1651daac..76e0a5bdae 100644 --- a/src/python/grpcio/grpc/framework/interfaces/base/base.py +++ b/src/python/grpcio/grpc/framework/interfaces/base/base.py @@ -27,10 +27,20 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""The base interface of RPC Framework.""" +"""The base interface of RPC Framework. +Implementations of this interface support the conduct of "operations": +exchanges between two distinct ends of an arbitrary number of data payloads +and metadata such as a name for the operation, initial and terminal metadata +in each direction, and flow control. These operations may be used for transfers +of data, remote procedure calls, status indication, or anything else +applications choose. +""" + +# threading is referenced from specification in this module. import abc import enum +import threading # abandonment is referenced from specification in this module. from grpc.framework.foundation import abandonment # pylint: disable=unused-import @@ -208,19 +218,26 @@ class End(object): raise NotImplementedError() @abc.abstractmethod - def stop_gracefully(self): - """Gracefully stops this object's service of operations. + def stop(self, grace): + """Stops this object's service of operations. - Operations in progress will be allowed to complete, and this method blocks - until all of them have. - """ - raise NotImplementedError() + This object will refuse service of new operations as soon as this method is + called but operations under way at the time of the call may be given a + grace period during which they are allowed to finish. - @abc.abstractmethod - def stop_immediately(self): - """Immediately stops this object's service of operations. + Args: + grace: A duration of time in seconds to allow ongoing operations to + terminate before being forcefully terminated by the stopping of this + End. May be zero to terminate all ongoing operations and immediately + stop. - Operations in progress will not be allowed to complete. + Returns: + A threading.Event that will be set to indicate all operations having + terminated and this End having completely stopped. The returned event + may not be set until after the full grace period (if some ongoing + operation continues for the full length of the period) or it may be set + much sooner (if for example this End had no operations in progress at + the time its stop method was called). """ raise NotImplementedError() diff --git a/src/python/grpcio/grpc/framework/interfaces/links/links.py b/src/python/grpcio/grpc/framework/interfaces/links/links.py index 5ebbac8a6f..069ff024dd 100644 --- a/src/python/grpcio/grpc/framework/interfaces/links/links.py +++ b/src/python/grpcio/grpc/framework/interfaces/links/links.py @@ -98,7 +98,7 @@ class Ticket( COMPLETION = 'completion' CANCELLATION = 'cancellation' EXPIRATION = 'expiration' - LOCAL_SHUTDOWN = 'local shutdown' + SHUTDOWN = 'shutdown' RECEPTION_FAILURE = 'reception failure' TRANSMISSION_FAILURE = 'transmission failure' LOCAL_FAILURE = 'local failure' diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py new file mode 100644 index 0000000000..72b1ae5642 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py @@ -0,0 +1,165 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests the RPC Framework Core's implementation of the Base interface.""" + +import collections +import logging +import random +import time +import unittest + +from grpc._adapter import _intermediary_low +from grpc._links import invocation +from grpc._links import service +from grpc.framework.core import implementations +from grpc.framework.interfaces.base import utilities +from grpc_test import test_common as grpc_test_common +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.base import test_cases +from grpc_test.framework.interfaces.base import test_interfaces + +_INVOCATION_INITIAL_METADATA = ((b'0', b'abc'), (b'1', b'def'), (b'2', b'ghi'),) +_SERVICE_INITIAL_METADATA = ((b'3', b'jkl'), (b'4', b'mno'), (b'5', b'pqr'),) +_SERVICE_TERMINAL_METADATA = ((b'6', b'stu'), (b'7', b'vwx'), (b'8', b'yza'),) +_CODE = _intermediary_low.Code.OK +_MESSAGE = b'test message' + + +class _SerializationBehaviors( + collections.namedtuple( + '_SerializationBehaviors', + ('request_serializers', 'request_deserializers', 'response_serializers', + 'response_deserializers',))): + pass + + +class _Links( + collections.namedtuple( + '_Links', + ('invocation_end_link', 'invocation_grpc_link', 'service_grpc_link', + 'service_end_link'))): + pass + + +def _serialization_behaviors_from_serializations(serializations): + request_serializers = {} + request_deserializers = {} + response_serializers = {} + response_deserializers = {} + for (group, method), serialization in serializations.iteritems(): + request_serializers[group, method] = serialization.serialize_request + request_deserializers[group, method] = serialization.deserialize_request + response_serializers[group, method] = serialization.serialize_response + response_deserializers[group, method] = serialization.deserialize_response + return _SerializationBehaviors( + request_serializers, request_deserializers, response_serializers, + response_deserializers) + + +class _Implementation(test_interfaces.Implementation): + + def instantiate(self, serializations, servicer): + serialization_behaviors = _serialization_behaviors_from_serializations( + serializations) + invocation_end_link = implementations.invocation_end_link() + service_end_link = implementations.service_end_link( + servicer, test_constants.DEFAULT_TIMEOUT, + test_constants.MAXIMUM_TIMEOUT) + service_grpc_link = service.service_link( + serialization_behaviors.request_deserializers, + serialization_behaviors.response_serializers) + port = service_grpc_link.add_port(0, None) + channel = _intermediary_low.Channel('localhost:%d' % port, None) + invocation_grpc_link = invocation.invocation_link( + channel, b'localhost', + serialization_behaviors.request_serializers, + serialization_behaviors.response_deserializers) + + invocation_end_link.join_link(invocation_grpc_link) + invocation_grpc_link.join_link(invocation_end_link) + service_end_link.join_link(service_grpc_link) + service_grpc_link.join_link(service_end_link) + invocation_grpc_link.start() + service_grpc_link.start() + return invocation_end_link, service_end_link, ( + invocation_grpc_link, service_grpc_link) + + def destantiate(self, memo): + invocation_grpc_link, service_grpc_link = memo + invocation_grpc_link.stop() + service_grpc_link.stop_gracefully() + + def invocation_initial_metadata(self): + return _INVOCATION_INITIAL_METADATA + + def service_initial_metadata(self): + return _SERVICE_INITIAL_METADATA + + def invocation_completion(self): + return utilities.completion(None, None, None) + + def service_completion(self): + return utilities.completion(_SERVICE_TERMINAL_METADATA, _CODE, _MESSAGE) + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return original_metadata is None or grpc_test_common.metadata_transmitted( + original_metadata, transmitted_metadata) + + def completion_transmitted(self, original_completion, transmitted_completion): + if (original_completion.terminal_metadata is not None and + not grpc_test_common.metadata_transmitted( + original_completion.terminal_metadata, + transmitted_completion.terminal_metadata)): + return False + elif original_completion.code is not transmitted_completion.code: + return False + elif original_completion.message != transmitted_completion.message: + return False + else: + return True + + +def setUpModule(): + logging.warn('setUpModule!') + + +def tearDownModule(): + logging.warn('tearDownModule!') + + +def load_tests(loader, tests, pattern): + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/framework/core/__init__.py b/src/python/grpcio_test/grpc_test/framework/core/__init__.py new file mode 100644 index 0000000000..7086519106 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/core/__init__.py @@ -0,0 +1,30 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + diff --git a/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py new file mode 100644 index 0000000000..8d72f131d5 --- /dev/null +++ b/src/python/grpcio_test/grpc_test/framework/core/_base_interface_test.py @@ -0,0 +1,96 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Tests the RPC Framework Core's implementation of the Base interface.""" + +import logging +import random +import time +import unittest + +from grpc.framework.core import implementations +from grpc.framework.interfaces.base import utilities +from grpc_test.framework.common import test_constants +from grpc_test.framework.interfaces.base import test_cases +from grpc_test.framework.interfaces.base import test_interfaces + + +class _Implementation(test_interfaces.Implementation): + + def __init__(self): + self._invocation_initial_metadata = object() + self._service_initial_metadata = object() + self._invocation_terminal_metadata = object() + self._service_terminal_metadata = object() + + def instantiate(self, serializations, servicer): + invocation = implementations.invocation_end_link() + service = implementations.service_end_link( + servicer, test_constants.DEFAULT_TIMEOUT, + test_constants.MAXIMUM_TIMEOUT) + invocation.join_link(service) + service.join_link(invocation) + return invocation, service, None + + def destantiate(self, memo): + pass + + def invocation_initial_metadata(self): + return self._invocation_initial_metadata + + def service_initial_metadata(self): + return self._service_initial_metadata + + def invocation_completion(self): + return utilities.completion(self._invocation_terminal_metadata, None, None) + + def service_completion(self): + return utilities.completion(self._service_terminal_metadata, None, None) + + def metadata_transmitted(self, original_metadata, transmitted_metadata): + return transmitted_metadata is original_metadata + + def completion_transmitted(self, original_completion, transmitted_completion): + return ( + (original_completion.terminal_metadata is + transmitted_completion.terminal_metadata) and + original_completion.code is transmitted_completion.code and + original_completion.message is transmitted_completion.message + ) + + +def load_tests(loader, tests, pattern): + return unittest.TestSuite( + tests=tuple( + loader.loadTestsFromTestCase(test_case_class) + for test_case_class in test_cases.test_cases(_Implementation()))) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py index dd332fe5dd..5c8b176da4 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/base/test_cases.py @@ -211,8 +211,10 @@ class _OperationTest(unittest.TestCase): elif instruction.kind is _control.Instruction.Kind.CONCLUDE: break - invocation_end.stop_gracefully() - service_end.stop_gracefully() + invocation_stop_event = invocation_end.stop(0) + service_stop_event = service_end.stop(0) + invocation_stop_event.wait() + service_stop_event.wait() invocation_stats = invocation_end.operation_stats() service_stats = service_end.operation_stats() diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py index 6c2e3346aa..a2bd7107c1 100644 --- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py +++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_utilities.py @@ -29,9 +29,42 @@ """State and behavior appropriate for use in tests.""" +import logging import threading +import time from grpc.framework.interfaces.links import links +from grpc.framework.interfaces.links import utilities + +# A more-or-less arbitrary limit on the length of raw data values to be logged. +_UNCOMFORTABLY_LONG = 48 + + +def _safe_for_log_ticket(ticket): + """Creates a safe-for-printing-to-the-log ticket for a given ticket. + + Args: + ticket: Any links.Ticket. + + Returns: + A links.Ticket that is as much as can be equal to the given ticket but + possibly features values like the string "<payload of length 972321>" in + place of the actual values of the given ticket. + """ + if isinstance(ticket.payload, (basestring,)): + payload_length = len(ticket.payload) + else: + payload_length = -1 + if payload_length < _UNCOMFORTABLY_LONG: + return ticket + else: + return links.Ticket( + ticket.operation_id, ticket.sequence_number, + ticket.group, ticket.method, ticket.subscription, ticket.timeout, + ticket.allowance, ticket.initial_metadata, + '<payload of length {}>'.format(payload_length), + ticket.terminal_metadata, ticket.code, ticket.message, + ticket.termination) class RecordingLink(links.Link): @@ -64,3 +97,71 @@ class RecordingLink(links.Link): """Returns a copy of the list of all tickets received by this Link.""" with self._condition: return tuple(self._tickets) + + +class _Pipe(object): + """A conduit that logs all tickets passed through it.""" + + def __init__(self, name): + self._lock = threading.Lock() + self._name = name + self._left_mate = utilities.NULL_LINK + self._right_mate = utilities.NULL_LINK + + def accept_left_to_right_ticket(self, ticket): + with self._lock: + logging.warning( + '%s: moving left to right through %s: %s', time.time(), self._name, + _safe_for_log_ticket(ticket)) + try: + self._right_mate.accept_ticket(ticket) + except Exception as e: # pylint: disable=broad-except + logging.exception(e) + + def accept_right_to_left_ticket(self, ticket): + with self._lock: + logging.warning( + '%s: moving right to left through %s: %s', time.time(), self._name, + _safe_for_log_ticket(ticket)) + try: + self._left_mate.accept_ticket(ticket) + except Exception as e: # pylint: disable=broad-except + logging.exception(e) + + def join_left_mate(self, left_mate): + with self._lock: + self._left_mate = utilities.NULL_LINK if left_mate is None else left_mate + + def join_right_mate(self, right_mate): + with self._lock: + self._right_mate = ( + utilities.NULL_LINK if right_mate is None else right_mate) + + +class _Facade(links.Link): + + def __init__(self, accept, join): + self._accept = accept + self._join = join + + def accept_ticket(self, ticket): + self._accept(ticket) + + def join_link(self, link): + self._join(link) + + +def logging_links(name): + """Creates a conduit that logs all tickets passed through it. + + Args: + name: A name to use for the conduit to identify itself in logging output. + + Returns: + Two links.Links, the first of which is the "left" side of the conduit + and the second of which is the "right" side of the conduit. + """ + pipe = _Pipe(name) + left_facade = _Facade(pipe.accept_left_to_right_ticket, pipe.join_left_mate) + right_facade = _Facade(pipe.accept_right_to_left_ticket, pipe.join_right_mate) + return left_facade, right_facade diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 36c6818a7e..6b5beb6f5d 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -82,6 +82,10 @@ static ID id_metadata; * received by the call and subsequently saved on it. */ static ID id_status; +/* id_write_flag is name of the attribute used to access the write_flag + * saved on the call. */ +static ID id_write_flag; + /* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */ static VALUE sym_send_message; static VALUE sym_send_metadata; @@ -240,6 +244,30 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) { return rb_ivar_set(self, id_metadata, metadata); } +/* + call-seq: + write_flag = call.write_flag + + Gets the write_flag value saved the call. */ +static VALUE grpc_rb_call_get_write_flag(VALUE self) { + return rb_ivar_get(self, id_write_flag); +} + +/* + call-seq: + call.write_flag = write_flag + + Saves the write_flag on the call. */ +static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) { + if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) { + rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>", + rb_obj_classname(write_flag)); + return Qnil; + } + + return rb_ivar_set(self, id_write_flag, write_flag); +} + /* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used to fill grpc_metadata_array. @@ -437,17 +465,19 @@ typedef struct run_batch_stack { grpc_status_code recv_status; char *recv_status_details; size_t recv_status_details_capacity; + uint write_flag; } run_batch_stack; /* grpc_run_batch_stack_init ensures the run_batch_stack is properly * initialized */ -static void grpc_run_batch_stack_init(run_batch_stack *st) { +static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) { MEMZERO(st, run_batch_stack, 1); grpc_metadata_array_init(&st->send_metadata); grpc_metadata_array_init(&st->send_trailing_metadata); grpc_metadata_array_init(&st->recv_metadata); grpc_metadata_array_init(&st->recv_trailing_metadata); st->op_num = 0; + st->write_flag = write_flag; } /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly @@ -477,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) { this_op = rb_ary_entry(ops_ary, i); this_value = rb_hash_aref(ops_hash, this_op); + st->ops[st->op_num].flags = 0; switch (NUM2INT(this_op)) { case GRPC_OP_SEND_INITIAL_METADATA: /* N.B. later there is no need to explicitly delete the metadata keys @@ -490,6 +521,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { case GRPC_OP_SEND_MESSAGE: st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer( RSTRING_PTR(this_value), RSTRING_LEN(this_value)); + st->ops[st->op_num].flags = st->write_flag; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: break; @@ -525,7 +557,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { NUM2INT(this_op)); }; st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op); - st->ops[st->op_num].flags = 0; st->ops[st->op_num].reserved = NULL; st->op_num++; } @@ -604,6 +635,8 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, grpc_event ev; grpc_call_error err; VALUE result = Qnil; + VALUE rb_write_flag = rb_ivar_get(self, id_write_flag); + uint write_flag = 0; TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); /* Validate the ops args, adding them to a ruby array */ @@ -611,7 +644,10 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash"); return Qnil; } - grpc_run_batch_stack_init(&st); + if (rb_write_flag != Qnil) { + write_flag = NUM2UINT(rb_write_flag); + } + grpc_run_batch_stack_init(&st, write_flag); grpc_run_batch_stack_fill_ops(&st, ops_hash); /* call grpc_call_start_batch, then wait for it to complete using @@ -638,6 +674,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, return result; } +static void Init_grpc_write_flags() { + /* Constants representing the write flags in grpc.h */ + VALUE grpc_rb_mWriteFlags = + rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags"); + rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT", + UINT2NUM(GRPC_WRITE_BUFFER_HINT)); + rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS", + UINT2NUM(GRPC_WRITE_NO_COMPRESS)); +} + static void Init_grpc_error_codes() { /* Constants representing the error codes of grpc_call_error in grpc.h */ VALUE grpc_rb_mRpcErrors = @@ -735,10 +781,14 @@ void Init_grpc_call() { rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1); rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0); rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1); + rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0); + rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag, + 1); /* Ids used to support call attributes */ id_metadata = rb_intern("metadata"); id_status = rb_intern("status"); + id_write_flag = rb_intern("write_flag"); /* Ids used by the c wrapping internals. */ id_cq = rb_intern("__cq"); @@ -766,6 +816,7 @@ void Init_grpc_call() { Init_grpc_error_codes(); Init_grpc_op_codes(); + Init_grpc_write_flags(); } /* Gets the call from the ruby object */ diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 17da401c6b..d9cb924735 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -59,7 +59,7 @@ module GRPC include Core::CallOps extend Forwardable attr_reader(:deadline) - def_delegators :@call, :cancel, :metadata + def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag= # client_invoke begins a client invocation. # @@ -484,6 +484,7 @@ module GRPC # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status, :start_call, :wait) + :metadata, :status, :start_call, :wait, :write_flag, + :write_flag=) end end diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb index 3c5d33ffcd..dd3c45f754 100644 --- a/src/ruby/spec/call_spec.rb +++ b/src/ruby/spec/call_spec.rb @@ -31,6 +31,14 @@ require 'grpc' include GRPC::Core::StatusCodes +describe GRPC::Core::WriteFlags do + it 'should define the known write flag values' do + m = GRPC::Core::WriteFlags + expect(m.const_get(:BUFFER_HINT)).to_not be_nil + expect(m.const_get(:NO_COMPRESS)).to_not be_nil + end +end + describe GRPC::Core::RpcErrors do before(:each) do @known_types = { diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 26208b714a..fcd7bd082f 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -35,6 +35,7 @@ describe GRPC::ActiveCall do ActiveCall = GRPC::ActiveCall Call = GRPC::Core::Call CallOps = GRPC::Core::CallOps + WriteFlags = GRPC::Core::WriteFlags before(:each) do @pass_through = proc { |x| x } @@ -129,6 +130,31 @@ describe GRPC::ActiveCall do @pass_through, deadline) expect(server_call.remote_read).to eq('marshalled:' + msg) end + + TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS] + TEST_WRITE_FLAGS.each do |f| + it "successfully makes calls with write_flag set to #{f}" do + call = make_test_call + ActiveCall.client_invoke(call, @client_queue) + marshal = proc { |x| 'marshalled:' + x } + client_call = ActiveCall.new(call, @client_queue, marshal, + @pass_through, deadline) + msg = 'message is a string' + client_call.write_flag = f + client_call.remote_send(msg) + + # confirm that the message was marshalled + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + recvd_call = recvd_rpc.call + server_ops = { + CallOps::SEND_INITIAL_METADATA => nil + } + recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops) + server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through, + @pass_through, deadline) + expect(server_call.remote_read).to eq('marshalled:' + msg) + end + end end describe '#client_invoke' do @@ -261,7 +287,7 @@ describe GRPC::ActiveCall do client_call.writes_done(false) server_call = expect_server_to_receive(msg) e = client_call.each_remote_read - n = 3 # arbitrary value > 1 + n = 3 # arbitrary value > 1 n.times do server_call.remote_send(reply) expect(e.next).to eq(reply) |