diff options
Diffstat (limited to 'src')
60 files changed, 866 insertions, 316 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c index 20d723bbc1..762a4edc73 100644 --- a/src/core/channel/compress_filter.c +++ b/src/core/channel/compress_filter.c @@ -216,7 +216,7 @@ static void process_send_ops(grpc_call_element *elem, [calld->compression_algorithm])); /* convey supported compression algorithms */ - grpc_metadata_batch_add_head( + grpc_metadata_batch_add_tail( &(sop->data.metadata), &calld->accept_encoding_storage, GRPC_MDELEM_REF(channeld->mdelem_accept_encoding)); diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 4990e6583a..1c402418e8 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -975,6 +975,11 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport, grpc_transport_perform_op(transport, &op); } +void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) { + (void) done_arg; + gpr_free(storage); +} + void grpc_server_shutdown_and_notify(grpc_server *server, grpc_completion_queue *cq, void *tag) { listener *l; @@ -986,6 +991,12 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* lock, and gather up some stuff to do */ gpr_mu_lock(&server->mu_global); grpc_cq_begin_op(cq); + if (server->shutdown_published) { + grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL, + gpr_malloc(sizeof(grpc_cq_completion))); + gpr_mu_unlock(&server->mu_global); + return; + } server->shutdown_tags = gpr_realloc(server->shutdown_tags, sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)); diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc index 9695a0f14b..17f31c22cb 100644 --- a/src/cpp/client/channel.cc +++ b/src/cpp/client/channel.cc @@ -71,7 +71,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context, } else { const char* host_str = NULL; if (!context->authority().empty()) { - host_str = context->authority().c_str(); + host_str = context->authority_.c_str(); } else if (!host_.empty()) { host_str = host_.c_str(); } diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 27472f4880..e039c07374 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -90,6 +90,26 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { return mrd; } + static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok, + gpr_timespec deadline) { + void* tag = nullptr; + *ok = false; + switch (cq->AsyncNext(&tag, ok, deadline)) { + case CompletionQueue::TIMEOUT: + *req = nullptr; + return true; + case CompletionQueue::SHUTDOWN: + *req = nullptr; + return false; + case CompletionQueue::GOT_EVENT: + *req = static_cast<SyncRequest*>(tag); + GPR_ASSERT((*req)->in_flight_); + return true; + } + gpr_log(GPR_ERROR, "Should never reach here"); + abort(); + } + void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } void TeardownRequest() { @@ -303,12 +323,27 @@ bool Server::Start() { return true; } -void Server::Shutdown() { +void Server::ShutdownInternal(gpr_timespec deadline) { grpc::unique_lock<grpc::mutex> lock(mu_); if (started_ && !shutdown_) { shutdown_ = true; grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); + // Spin, eating requests until the completion queue is completely shutdown. + // If the deadline expires then cancel anything that's pending and keep + // spinning forever until the work is actually drained. + // Since nothing else needs to touch state guarded by mu_, holding it + // through this loop is fine. + SyncRequest* request; + bool ok; + while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) { + if (request == NULL) { // deadline expired + grpc_server_cancel_all_calls(server_); + deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + } else if (ok) { + SyncRequest::CallData call_data(this, request); + } + } // Wait for running callbacks to finish. while (num_running_cb_ != 0) { diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs index 2787572924..dfbd92879e 100644 --- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs +++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs @@ -41,12 +41,6 @@ namespace Grpc.Core.Tests { public class ChannelTest { - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void Constructor_RejectsInvalidParams() { @@ -56,36 +50,33 @@ namespace Grpc.Core.Tests [Test] public void State_IdleAfterCreation() { - using (var channel = new Channel("localhost", Credentials.Insecure)) - { - Assert.AreEqual(ChannelState.Idle, channel.State); - } + var channel = new Channel("localhost", Credentials.Insecure); + Assert.AreEqual(ChannelState.Idle, channel.State); + channel.ShutdownAsync().Wait(); } [Test] public void WaitForStateChangedAsync_InvalidArgument() { - using (var channel = new Channel("localhost", Credentials.Insecure)) - { - Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure)); - } + var channel = new Channel("localhost", Credentials.Insecure); + Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure)); + channel.ShutdownAsync().Wait(); } [Test] public void ResolvedTarget() { - using (var channel = new Channel("127.0.0.1", Credentials.Insecure)) - { - Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1")); - } + var channel = new Channel("127.0.0.1", Credentials.Insecure); + Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1")); + channel.ShutdownAsync().Wait(); } [Test] - public void Dispose_IsIdempotent() + public void Shutdown_AllowedOnlyOnce() { var channel = new Channel("localhost", Credentials.Insecure); - channel.Dispose(); - channel.Dispose(); + channel.ShutdownAsync().Wait(); + Assert.Throws(typeof(InvalidOperationException), () => channel.ShutdownAsync().GetAwaiter().GetResult()); } } } diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs index e49fdb5268..68279a2007 100644 --- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs @@ -63,16 +63,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public async Task UnaryCall() { @@ -208,13 +202,6 @@ namespace Grpc.Core.Tests } [Test] - public void UnaryCall_DisposedChannel() - { - channel.Dispose(); - Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC")); - } - - [Test] public void UnaryCallPerformance() { helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) => diff --git a/src/csharp/Grpc.Core.Tests/CompressionTest.cs b/src/csharp/Grpc.Core.Tests/CompressionTest.cs index 9547683f60..378c81851c 100644 --- a/src/csharp/Grpc.Core.Tests/CompressionTest.cs +++ b/src/csharp/Grpc.Core.Tests/CompressionTest.cs @@ -62,16 +62,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void WriteOptions_Unary() { diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs index db5f953b0e..2db3f286f7 100644 --- a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs +++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs @@ -62,16 +62,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public async Task PropagateCancellation() { diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj index d6a8f52570..ad4e94a695 100644 --- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj +++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj @@ -64,6 +64,7 @@ <Link>Version.cs</Link> </Compile> <Compile Include="ClientBaseTest.cs" /> + <Compile Include="ShutdownTest.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="ClientServerTest.cs" /> <Compile Include="ServerTest.cs" /> @@ -82,6 +83,7 @@ <Compile Include="ResponseHeadersTest.cs" /> <Compile Include="CompressionTest.cs" /> <Compile Include="ContextPropagationTest.cs" /> + <Compile Include="MetadataTest.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs index 4ed93c7eca..4fdfab5a99 100644 --- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs +++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs @@ -43,34 +43,40 @@ namespace Grpc.Core.Tests [Test] public void InitializeAndShutdownGrpcEnvironment() { - var env = GrpcEnvironment.GetInstance(); + var env = GrpcEnvironment.AddRef(); Assert.IsNotNull(env.CompletionQueue); - GrpcEnvironment.Shutdown(); + GrpcEnvironment.Release(); } [Test] public void SubsequentInvocations() { - var env1 = GrpcEnvironment.GetInstance(); - var env2 = GrpcEnvironment.GetInstance(); + var env1 = GrpcEnvironment.AddRef(); + var env2 = GrpcEnvironment.AddRef(); Assert.IsTrue(object.ReferenceEquals(env1, env2)); - GrpcEnvironment.Shutdown(); - GrpcEnvironment.Shutdown(); + GrpcEnvironment.Release(); + GrpcEnvironment.Release(); } [Test] public void InitializeAfterShutdown() { - var env1 = GrpcEnvironment.GetInstance(); - GrpcEnvironment.Shutdown(); + var env1 = GrpcEnvironment.AddRef(); + GrpcEnvironment.Release(); - var env2 = GrpcEnvironment.GetInstance(); - GrpcEnvironment.Shutdown(); + var env2 = GrpcEnvironment.AddRef(); + GrpcEnvironment.Release(); Assert.IsFalse(object.ReferenceEquals(env1, env2)); } [Test] + public void ReleaseWithoutAddRef() + { + Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release()); + } + + [Test] public void GetCoreVersionString() { var coreVersion = GrpcEnvironment.GetCoreVersionString(); diff --git a/src/csharp/Grpc.Core.Tests/MetadataTest.cs b/src/csharp/Grpc.Core.Tests/MetadataTest.cs new file mode 100644 index 0000000000..c00f945d6a --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/MetadataTest.cs @@ -0,0 +1,120 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class MetadataTest + { + [Test] + public void AsciiEntry() + { + var entry = new Metadata.Entry("ABC", "XYZ"); + Assert.IsFalse(entry.IsBinary); + Assert.AreEqual("abc", entry.Key); // key is in lowercase. + Assert.AreEqual("XYZ", entry.Value); + CollectionAssert.AreEqual(new[] { (byte)'X', (byte)'Y', (byte)'Z' }, entry.ValueBytes); + + Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc-bin", "xyz")); + + Assert.AreEqual("[Entry: key=abc, value=XYZ]", entry.ToString()); + } + + [Test] + public void BinaryEntry() + { + var bytes = new byte[] { 1, 2, 3 }; + var entry = new Metadata.Entry("ABC-BIN", bytes); + Assert.IsTrue(entry.IsBinary); + Assert.AreEqual("abc-bin", entry.Key); // key is in lowercase. + Assert.Throws(typeof(InvalidOperationException), () => { var v = entry.Value; }); + CollectionAssert.AreEqual(bytes, entry.ValueBytes); + + Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc", bytes)); + + Assert.AreEqual("[Entry: key=abc-bin, valueBytes=System.Byte[]]", entry.ToString()); + } + + [Test] + public void Entry_ConstructionPreconditions() + { + Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry(null, "xyz")); + Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry("abc", (string)null)); + Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry("abc-bin", (byte[])null)); + } + + [Test] + public void Entry_Immutable() + { + var origBytes = new byte[] { 1, 2, 3 }; + var bytes = new byte[] { 1, 2, 3 }; + var entry = new Metadata.Entry("ABC-BIN", bytes); + bytes[0] = 255; // changing the array passed to constructor should have any effect. + CollectionAssert.AreEqual(origBytes, entry.ValueBytes); + + entry.ValueBytes[0] = 255; + CollectionAssert.AreEqual(origBytes, entry.ValueBytes); + } + + [Test] + public void Entry_CreateUnsafe_Ascii() + { + var bytes = new byte[] { (byte)'X', (byte)'y' }; + var entry = Metadata.Entry.CreateUnsafe("abc", bytes); + Assert.IsFalse(entry.IsBinary); + Assert.AreEqual("abc", entry.Key); + Assert.AreEqual("Xy", entry.Value); + CollectionAssert.AreEqual(bytes, entry.ValueBytes); + } + + [Test] + public void Entry_CreateUnsafe_Binary() + { + var bytes = new byte[] { 1, 2, 3 }; + var entry = Metadata.Entry.CreateUnsafe("abc-bin", bytes); + Assert.IsTrue(entry.IsBinary); + Assert.AreEqual("abc-bin", entry.Key); + Assert.Throws(typeof(InvalidOperationException), () => { var v = entry.Value; }); + CollectionAssert.AreEqual(bytes, entry.ValueBytes); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs index 981b8ea3c8..706006702e 100644 --- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs +++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs @@ -69,16 +69,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void WriteResponseHeaders_NullNotAllowed() { diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs index 485006ebac..e7193c843b 100644 --- a/src/csharp/Grpc.Core.Tests/ServerTest.cs +++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs @@ -51,7 +51,6 @@ namespace Grpc.Core.Tests }; server.Start(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] @@ -67,8 +66,7 @@ namespace Grpc.Core.Tests Assert.Greater(boundPort.BoundPort, 0); server.Start(); - server.ShutdownAsync(); - GrpcEnvironment.Shutdown(); + server.ShutdownAsync().Wait(); } [Test] @@ -83,7 +81,6 @@ namespace Grpc.Core.Tests Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build())); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } } } diff --git a/src/csharp/Grpc.Core.Tests/ShutdownTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs new file mode 100644 index 0000000000..a2be7ddd5e --- /dev/null +++ b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs @@ -0,0 +1,77 @@ +#region Copyright notice and license + +// Copyright 2015, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Internal; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Grpc.Core.Tests +{ + public class ShutdownTest + { + const string Host = "127.0.0.1"; + + MockServiceHelper helper; + Server server; + Channel channel; + + [SetUp] + public void Init() + { + helper = new MockServiceHelper(Host); + server = helper.GetServer(); + server.Start(); + channel = helper.GetChannel(); + } + + [Test] + public async Task AbandonedCall() + { + helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) => + { + await requestStream.ToListAsync(); + }); + + var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddMilliseconds(1)))); + + channel.ShutdownAsync().Wait(); + server.ShutdownAsync().Wait(); + } + } +} diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs index d875d601b9..41f661f62d 100644 --- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs +++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs @@ -65,16 +65,10 @@ namespace Grpc.Core.Tests [TearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); } - [TestFixtureTearDown] - public void CleanupClass() - { - GrpcEnvironment.Shutdown(); - } - [Test] public void InfiniteDeadline() { diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs index 64c6adf2bf..2f8519dfa3 100644 --- a/src/csharp/Grpc.Core/Channel.cs +++ b/src/csharp/Grpc.Core/Channel.cs @@ -45,14 +45,19 @@ namespace Grpc.Core /// <summary> /// gRPC Channel /// </summary> - public class Channel : IDisposable + public class Channel { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>(); + readonly object myLock = new object(); + readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly string target; readonly GrpcEnvironment environment; readonly ChannelSafeHandle handle; readonly List<ChannelOption> options; + + bool shutdownRequested; bool disposed; /// <summary> @@ -65,7 +70,7 @@ namespace Grpc.Core public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null) { this.target = Preconditions.CheckNotNull(target, "target"); - this.environment = GrpcEnvironment.GetInstance(); + this.environment = GrpcEnvironment.AddRef(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); EnsureUserAgentChannelOption(this.options); @@ -172,12 +177,26 @@ namespace Grpc.Core } /// <summary> - /// Destroys the underlying channel. + /// Waits until there are no more active calls for this channel and then cleans up + /// resources used by this channel. /// </summary> - public void Dispose() + public async Task ShutdownAsync() { - Dispose(true); - GC.SuppressFinalize(this); + lock (myLock) + { + Preconditions.CheckState(!shutdownRequested); + shutdownRequested = true; + } + + var activeCallCount = activeCallCounter.Count; + if (activeCallCount > 0) + { + Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount); + } + + handle.Dispose(); + + await Task.Run(() => GrpcEnvironment.Release()); } internal ChannelSafeHandle Handle @@ -196,13 +215,20 @@ namespace Grpc.Core } } - protected virtual void Dispose(bool disposing) + internal void AddCallReference(object call) { - if (disposing && handle != null && !disposed) - { - disposed = true; - handle.Dispose(); - } + activeCallCounter.Increment(); + + bool success = false; + handle.DangerousAddRef(ref success); + Preconditions.CheckState(success); + } + + internal void RemoveCallReference(object call) + { + handle.DangerousRelease(); + + activeCallCounter.Decrement(); } private static void EnsureUserAgentChannelOption(List<ChannelOption> options) diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs index 7bc100ca60..903449439b 100644 --- a/src/csharp/Grpc.Core/ClientBase.cs +++ b/src/csharp/Grpc.Core/ClientBase.cs @@ -119,7 +119,8 @@ namespace Grpc.Core internal static string GetAuthUriBase(string target) { var match = ChannelTargetPattern.Match(target); - if (!match.Success) { + if (!match.Success) + { return null; } return "https://" + match.Groups[2].Value + "/"; diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs index 2e4bfc9e47..a5bf1b5a70 100644 --- a/src/csharp/Grpc.Core/ContextPropagationToken.cs +++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs @@ -132,7 +132,6 @@ namespace Grpc.Core bool propagateDeadline; bool propagateCancellation; - /// <summary> /// Creates new context propagation options. /// </summary> diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs index 30d8c80235..0a44eead74 100644 --- a/src/csharp/Grpc.Core/GrpcEnvironment.cs +++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs @@ -58,6 +58,7 @@ namespace Grpc.Core static object staticLock = new object(); static GrpcEnvironment instance; + static int refCount; static ILogger logger = new ConsoleLogger(); @@ -67,13 +68,14 @@ namespace Grpc.Core bool isClosed; /// <summary> - /// Returns an instance of initialized gRPC environment. - /// Subsequent invocations return the same instance unless Shutdown has been called first. + /// Returns a reference-counted instance of initialized gRPC environment. + /// Subsequent invocations return the same instance unless reference count has dropped to zero previously. /// </summary> - internal static GrpcEnvironment GetInstance() + internal static GrpcEnvironment AddRef() { lock (staticLock) { + refCount++; if (instance == null) { instance = new GrpcEnvironment(); @@ -83,14 +85,16 @@ namespace Grpc.Core } /// <summary> - /// Shuts down the gRPC environment if it was initialized before. - /// Blocks until the environment has been fully shutdown. + /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero. + /// (and blocks until the environment has been fully shutdown). /// </summary> - public static void Shutdown() + internal static void Release() { lock (staticLock) { - if (instance != null) + Preconditions.CheckState(refCount > 0); + refCount--; + if (refCount == 0) { instance.Close(); instance = null; @@ -125,12 +129,10 @@ namespace Grpc.Core private GrpcEnvironment() { NativeLogRedirector.Redirect(); - grpcsharp_init(); + GrpcNativeInit(); completionRegistry = new CompletionRegistry(this); threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE); threadPool.Start(); - // TODO: use proper logging here - Logger.Info("gRPC initialized."); } /// <summary> @@ -175,6 +177,17 @@ namespace Grpc.Core return Marshal.PtrToStringAnsi(ptr); } + + internal static void GrpcNativeInit() + { + grpcsharp_init(); + } + + internal static void GrpcNativeShutdown() + { + grpcsharp_shutdown(); + } + /// <summary> /// Shuts down this environment. /// </summary> @@ -185,12 +198,10 @@ namespace Grpc.Core throw new InvalidOperationException("Close has already been called"); } threadPool.Stop(); - grpcsharp_shutdown(); + GrpcNativeShutdown(); isClosed = true; debugStats.CheckOK(); - - Logger.Info("gRPC shutdown."); } } } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 2c3e3d75ea..bb9ba5b8dd 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -311,9 +311,9 @@ namespace Grpc.Core.Internal } } - protected override void OnReleaseResources() + protected override void OnAfterReleaseResources() { - details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement(); + details.Channel.RemoveCallReference(this); } private void Initialize(CompletionQueueSafeHandle cq) @@ -323,7 +323,9 @@ namespace Grpc.Core.Internal var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry, parentCall, ContextPropagationToken.DefaultMask, cq, details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value)); - details.Channel.Environment.DebugStats.ActiveClientCalls.Increment(); + + details.Channel.AddCallReference(this); + InitializeInternal(call); RegisterCancellationCallback(); } diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs index 6ca4bbdafc..1808294f43 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs @@ -189,15 +189,15 @@ namespace Grpc.Core.Internal private void ReleaseResources() { - OnReleaseResources(); if (call != null) { call.Dispose(); } disposed = true; + OnAfterReleaseResources(); } - protected virtual void OnReleaseResources() + protected virtual void OnAfterReleaseResources() { } @@ -212,7 +212,7 @@ namespace Grpc.Core.Internal Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time"); } - protected void CheckReadingAllowed() + protected virtual void CheckReadingAllowed() { Preconditions.CheckState(started); Preconditions.CheckState(!disposed); diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs index 3710a65d6b..6278c0191e 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs @@ -50,16 +50,19 @@ namespace Grpc.Core.Internal readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>(); readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); readonly GrpcEnvironment environment; + readonly Server server; - public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer) + public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer) { this.environment = Preconditions.CheckNotNull(environment); + this.server = Preconditions.CheckNotNull(server); } public void Initialize(CallSafeHandle call) { call.SetCompletionRegistry(environment.CompletionRegistry); - environment.DebugStats.ActiveServerCalls.Increment(); + + server.AddCallReference(this); InitializeInternal(call); } @@ -168,9 +171,15 @@ namespace Grpc.Core.Internal } } - protected override void OnReleaseResources() + protected override void CheckReadingAllowed() + { + base.CheckReadingAllowed(); + Preconditions.CheckArgument(!cancelRequested); + } + + protected override void OnAfterReleaseResources() { - environment.DebugStats.ActiveServerCalls.Decrement(); + server.RemoveCallReference(this); } /// <summary> diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs index 6a2add54db..3a96414bea 100644 --- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs @@ -134,7 +134,7 @@ namespace Grpc.Core.Internal } // Gets data of server_rpc_new completion. - public ServerRpcNew GetServerRpcNew() + public ServerRpcNew GetServerRpcNew(Server server) { var call = grpcsharp_batch_context_server_rpc_new_call(this); @@ -145,7 +145,7 @@ namespace Grpc.Core.Internal IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this); var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr); - return new ServerRpcNew(call, method, host, deadline, metadata); + return new ServerRpcNew(server, call, method, host, deadline, metadata); } // Gets data of receive_close_on_server completion. @@ -198,14 +198,16 @@ namespace Grpc.Core.Internal /// </summary> internal struct ServerRpcNew { + readonly Server server; readonly CallSafeHandle call; readonly string method; readonly string host; readonly Timespec deadline; readonly Metadata requestMetadata; - public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) + public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata) { + this.server = server; this.call = call; this.method = method; this.host = host; @@ -213,6 +215,14 @@ namespace Grpc.Core.Internal this.requestMetadata = requestMetadata; } + public Server Server + { + get + { + return this.server; + } + } + public CallSafeHandle Call { get diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs index 7f03bf4ea5..8cef566c14 100644 --- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs @@ -68,11 +68,17 @@ namespace Grpc.Core.Internal public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_insecure_channel_create(target, channelArgs); } public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_secure_channel_create(credentials, target, channelArgs); } @@ -107,6 +113,7 @@ namespace Grpc.Core.Internal protected override bool ReleaseHandle() { grpcsharp_channel_destroy(handle); + GrpcEnvironment.GrpcNativeShutdown(); return true; } } diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs index 8793450ff3..1bea1adf9e 100644 --- a/src/csharp/Grpc.Core/Internal/DebugStats.cs +++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs @@ -38,10 +38,6 @@ namespace Grpc.Core.Internal { internal class DebugStats { - public readonly AtomicCounter ActiveClientCalls = new AtomicCounter(); - - public readonly AtomicCounter ActiveServerCalls = new AtomicCounter(); - public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter(); /// <summary> @@ -49,16 +45,6 @@ namespace Grpc.Core.Internal /// </summary> public void CheckOK() { - var remainingClientCalls = ActiveClientCalls.Count; - if (remainingClientCalls != 0) - { - DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls)); - } - var remainingServerCalls = ActiveServerCalls.Count; - if (remainingServerCalls != 0) - { - DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls)); - } var pendingBatchCompletions = PendingBatchCompletions.Count; if (pendingBatchCompletions != 0) { diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs index cb4c7c821e..4b7124ee74 100644 --- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs +++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs @@ -83,8 +83,6 @@ namespace Grpc.Core.Internal lock (myLock) { cq.Shutdown(); - - Logger.Info("Waiting for GRPC threads to finish."); foreach (var thread in threads) { thread.Join(); @@ -136,7 +134,6 @@ namespace Grpc.Core.Internal } } while (ev.type != GRPCCompletionType.Shutdown); - Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name); } } } diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs index 427c16fac6..83994f6762 100644 --- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs @@ -70,7 +70,8 @@ namespace Grpc.Core.Internal var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count)); for (int i = 0; i < metadata.Count; i++) { - grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, metadata[i].ValueBytes, new UIntPtr((ulong)metadata[i].ValueBytes.Length)); + var valueBytes = metadata[i].GetSerializedValueUnsafe(); + grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length)); } return metadataArray; } @@ -94,7 +95,7 @@ namespace Grpc.Core.Internal string key = Marshal.PtrToStringAnsi(grpcsharp_metadata_array_get_key(metadataArray, index)); var bytes = new byte[grpcsharp_metadata_array_get_value_length(metadataArray, index).ToUInt64()]; Marshal.Copy(grpcsharp_metadata_array_get_value(metadataArray, index), bytes, 0, bytes.Length); - metadata.Add(new Metadata.Entry(key, bytes)); + metadata.Add(Metadata.Entry.CreateUnsafe(key, bytes)); } return metadata; } diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs index 688f9f6fec..59f4c5727c 100644 --- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs +++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs @@ -67,7 +67,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -123,7 +123,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -179,7 +179,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -239,7 +239,7 @@ namespace Grpc.Core.Internal var asyncCall = new AsyncCallServer<TRequest, TResponse>( method.ResponseMarshaller.Serializer, method.RequestMarshaller.Deserializer, - environment); + environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); @@ -278,7 +278,7 @@ namespace Grpc.Core.Internal { // We don't care about the payload type here. var asyncCall = new AsyncCallServer<byte[], byte[]>( - (payload) => payload, (payload) => payload, environment); + (payload) => payload, (payload) => payload, environment, newRpc.Server); asyncCall.Initialize(newRpc.Call); var finishedTask = asyncCall.ServerSideCallAsync(); diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index f9b44b1acf..5ee7ac14e8 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -74,6 +74,9 @@ namespace Grpc.Core.Internal public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args) { + // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle. + // Doing so would make object finalizer crash if we end up abandoning the handle. + GrpcEnvironment.GrpcNativeInit(); return grpcsharp_server_create(cq, args); } @@ -109,6 +112,7 @@ namespace Grpc.Core.Internal protected override bool ReleaseHandle() { grpcsharp_server_destroy(handle); + GrpcEnvironment.GrpcNativeShutdown(); return true; } diff --git a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs index 382481d871..35561d25d8 100644 --- a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs +++ b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs @@ -51,7 +51,19 @@ namespace Grpc.Core.Logging private ConsoleLogger(Type forType) { this.forType = forType; - this.forTypeString = forType != null ? forType.FullName + " " : ""; + if (forType != null) + { + var namespaceStr = forType.Namespace ?? ""; + if (namespaceStr.Length > 0) + { + namespaceStr += "."; + } + this.forTypeString = namespaceStr + forType.Name + " "; + } + else + { + this.forTypeString = ""; + } } /// <summary> diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs index 9db2abf46e..a589b50caa 100644 --- a/src/csharp/Grpc.Core/Metadata.cs +++ b/src/csharp/Grpc.Core/Metadata.cs @@ -46,6 +46,11 @@ namespace Grpc.Core public sealed class Metadata : IList<Metadata.Entry> { /// <summary> + /// All binary headers should have this suffix. + /// </summary> + public const string BinaryHeaderSuffix = "-bin"; + + /// <summary> /// An read-only instance of metadata containing no entries. /// </summary> public static readonly Metadata Empty = new Metadata().Freeze(); @@ -181,23 +186,49 @@ namespace Grpc.Core private static readonly Encoding Encoding = Encoding.ASCII; readonly string key; - string value; - byte[] valueBytes; + readonly string value; + readonly byte[] valueBytes; + + private Entry(string key, string value, byte[] valueBytes) + { + this.key = key; + this.value = value; + this.valueBytes = valueBytes; + } + /// <summary> + /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct with a binary value. + /// </summary> + /// <param name="key">Metadata key, needs to have suffix indicating a binary valued metadata entry.</param> + /// <param name="valueBytes">Value bytes.</param> public Entry(string key, byte[] valueBytes) { - this.key = Preconditions.CheckNotNull(key, "key"); + this.key = NormalizeKey(key); + Preconditions.CheckArgument(this.key.EndsWith(BinaryHeaderSuffix), + "Key for binary valued metadata entry needs to have suffix indicating binary value."); this.value = null; - this.valueBytes = Preconditions.CheckNotNull(valueBytes, "valueBytes"); + Preconditions.CheckNotNull(valueBytes, "valueBytes"); + this.valueBytes = new byte[valueBytes.Length]; + Buffer.BlockCopy(valueBytes, 0, this.valueBytes, 0, valueBytes.Length); // defensive copy to guarantee immutability } + /// <summary> + /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct holding an ASCII value. + /// </summary> + /// <param name="key">Metadata key, must not use suffix indicating a binary valued metadata entry.</param> + /// <param name="value">Value string. Only ASCII characters are allowed.</param> public Entry(string key, string value) { - this.key = Preconditions.CheckNotNull(key, "key"); + this.key = NormalizeKey(key); + Preconditions.CheckArgument(!this.key.EndsWith(BinaryHeaderSuffix), + "Key for ASCII valued metadata entry cannot have suffix indicating binary value."); this.value = Preconditions.CheckNotNull(value, "value"); this.valueBytes = null; } + /// <summary> + /// Gets the metadata entry key. + /// </summary> public string Key { get @@ -206,33 +237,86 @@ namespace Grpc.Core } } + /// <summary> + /// Gets the binary value of this metadata entry. + /// </summary> public byte[] ValueBytes { get { if (valueBytes == null) { - valueBytes = Encoding.GetBytes(value); + return Encoding.GetBytes(value); } - return valueBytes; + + // defensive copy to guarantee immutability + var bytes = new byte[valueBytes.Length]; + Buffer.BlockCopy(valueBytes, 0, bytes, 0, valueBytes.Length); + return bytes; } } + /// <summary> + /// Gets the string value of this metadata entry. + /// </summary> public string Value { get { - if (value == null) - { - value = Encoding.GetString(valueBytes); - } - return value; + Preconditions.CheckState(!IsBinary, "Cannot access string value of a binary metadata entry"); + return value ?? Encoding.GetString(valueBytes); } } - + + /// <summary> + /// Returns <c>true</c> if this entry is a binary-value entry. + /// </summary> + public bool IsBinary + { + get + { + return value == null; + } + } + + /// <summary> + /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Metadata+Entry"/>. + /// </summary> public override string ToString() { - return string.Format("[Entry: key={0}, value={1}]", Key, Value); + if (IsBinary) + { + return string.Format("[Entry: key={0}, valueBytes={1}]", key, valueBytes); + } + + return string.Format("[Entry: key={0}, value={1}]", key, value); + } + + /// <summary> + /// Gets the serialized value for this entry. For binary metadata entries, this leaks + /// the internal <c>valueBytes</c> byte array and caller must not change contents of it. + /// </summary> + internal byte[] GetSerializedValueUnsafe() + { + return valueBytes ?? Encoding.GetBytes(value); + } + + /// <summary> + /// Creates a binary value or ascii value metadata entry from data received from the native layer. + /// We trust C core to give us well-formed data, so we don't perform any checks or defensive copying. + /// </summary> + internal static Entry CreateUnsafe(string key, byte[] valueBytes) + { + if (key.EndsWith(BinaryHeaderSuffix)) + { + return new Entry(key, null, valueBytes); + } + return new Entry(key, Encoding.GetString(valueBytes), null); + } + + private static string NormalizeKey(string key) + { + return Preconditions.CheckNotNull(key, "key").ToLower(); } } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index c76f126026..28f1686e20 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -50,6 +50,8 @@ namespace Grpc.Core { static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>(); + readonly AtomicCounter activeCallCounter = new AtomicCounter(); + readonly ServiceDefinitionCollection serviceDefinitions; readonly ServerPortCollection ports; readonly GrpcEnvironment environment; @@ -73,7 +75,7 @@ namespace Grpc.Core { this.serviceDefinitions = new ServiceDefinitionCollection(this); this.ports = new ServerPortCollection(this); - this.environment = GrpcEnvironment.GetInstance(); + this.environment = GrpcEnvironment.AddRef(); this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>(); using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options)) { @@ -106,6 +108,17 @@ namespace Grpc.Core } /// <summary> + /// To allow awaiting termination of the server. + /// </summary> + public Task ShutdownTask + { + get + { + return shutdownTcs.Task; + } + } + + /// <summary> /// Starts the server. /// </summary> public void Start() @@ -136,18 +149,9 @@ namespace Grpc.Core handle.ShutdownAndNotify(HandleServerShutdown, environment); await shutdownTcs.Task; - handle.Dispose(); - } + DisposeHandle(); - /// <summary> - /// To allow awaiting termination of the server. - /// </summary> - public Task ShutdownTask - { - get - { - return shutdownTcs.Task; - } + await Task.Run(() => GrpcEnvironment.Release()); } /// <summary> @@ -166,7 +170,22 @@ namespace Grpc.Core handle.ShutdownAndNotify(HandleServerShutdown, environment); handle.CancelAllCalls(); await shutdownTcs.Task; - handle.Dispose(); + DisposeHandle(); + } + + internal void AddCallReference(object call) + { + activeCallCounter.Increment(); + + bool success = false; + handle.DangerousAddRef(ref success); + Preconditions.CheckState(success); + } + + internal void RemoveCallReference(object call) + { + handle.DangerousRelease(); + activeCallCounter.Decrement(); } /// <summary> @@ -227,6 +246,16 @@ namespace Grpc.Core } } + private void DisposeHandle() + { + var activeCallCount = activeCallCounter.Count; + if (activeCallCount > 0) + { + Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount); + } + handle.Dispose(); + } + /// <summary> /// Selects corresponding handler for given call and handles the call. /// </summary> @@ -254,7 +283,7 @@ namespace Grpc.Core { if (success) { - ServerRpcNew newRpc = ctx.GetServerRpcNew(); + ServerRpcNew newRpc = ctx.GetServerRpcNew(this); // after server shutdown, the callback returns with null call if (!newRpc.Call.IsInvalid) diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs index f9839d99f1..abd95cb905 100644 --- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs +++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs @@ -39,23 +39,21 @@ namespace math { public static void Main(string[] args) { - using (Channel channel = new Channel("127.0.0.1", 23456, Credentials.Insecure)) - { - Math.IMathClient client = new Math.MathClient(channel); - MathExamples.DivExample(client); + var channel = new Channel("127.0.0.1", 23456, Credentials.Insecure); + Math.IMathClient client = new Math.MathClient(channel); + MathExamples.DivExample(client); - MathExamples.DivAsyncExample(client).Wait(); + MathExamples.DivAsyncExample(client).Wait(); - MathExamples.FibExample(client).Wait(); + MathExamples.FibExample(client).Wait(); - MathExamples.SumExample(client).Wait(); + MathExamples.SumExample(client).Wait(); - MathExamples.DivManyExample(client).Wait(); + MathExamples.DivManyExample(client).Wait(); - MathExamples.DependendRequestsExample(client).Wait(); - } + MathExamples.DependendRequestsExample(client).Wait(); - GrpcEnvironment.Shutdown(); + channel.ShutdownAsync().Wait(); } } } diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs index 5f7e717b0c..26bef646ec 100644 --- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs +++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs @@ -56,7 +56,6 @@ namespace math Console.ReadKey(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } } } diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs index fdef950f09..36c1c947bd 100644 --- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs +++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs @@ -68,9 +68,8 @@ namespace math.Tests [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs index 024377e216..80c35fb197 100644 --- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs +++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs @@ -71,10 +71,9 @@ namespace Grpc.HealthCheck.Tests [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index f4b0a1028f..24c22273fb 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -37,13 +37,15 @@ using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; +using Google.Apis.Auth.OAuth2; using Google.ProtocolBuffers; + using grpc.testing; using Grpc.Auth; using Grpc.Core; using Grpc.Core.Utils; + using NUnit.Framework; -using Google.Apis.Auth.OAuth2; namespace Grpc.IntegrationTesting { @@ -118,12 +120,10 @@ namespace Grpc.IntegrationTesting }; } - using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions)) - { - TestService.TestServiceClient client = new TestService.TestServiceClient(channel); - await RunTestCaseAsync(options.testCase, client); - } - GrpcEnvironment.Shutdown(); + var channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions); + TestService.TestServiceClient client = new TestService.TestServiceClient(channel); + await RunTestCaseAsync(options.testCase, client); + channel.ShutdownAsync().Wait(); } private async Task RunTestCaseAsync(string testCase, TestService.TestServiceClient client) @@ -169,6 +169,9 @@ namespace Grpc.IntegrationTesting case "cancel_after_first_response": await RunCancelAfterFirstResponseAsync(client); break; + case "timeout_on_sleeping_server": + await RunTimeoutOnSleepingServerAsync(client); + break; case "benchmark_empty_unary": RunBenchmarkEmptyUnary(client); break; @@ -458,6 +461,29 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } + public static async Task RunTimeoutOnSleepingServerAsync(TestService.ITestServiceClient client) + { + Console.WriteLine("running timeout_on_sleeping_server"); + + var deadline = DateTime.UtcNow.AddMilliseconds(1); + using (var call = client.FullDuplexCall(deadline: deadline)) + { + try + { + await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder() + .SetPayload(CreateZerosPayload(27182)).Build()); + } + catch (InvalidOperationException) + { + // Deadline was reached before write has started. Eat the exception and continue. + } + + var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext()); + Assert.AreEqual(StatusCode.DeadlineExceeded, ex.Status.StatusCode); + } + Console.WriteLine("Passed!"); + } + // This is not an official interop test, but it's useful. public static void RunBenchmarkEmptyUnary(TestService.ITestServiceClient client) { diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs index 6fa721bc1c..f3158aeb45 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs @@ -75,9 +75,8 @@ namespace Grpc.IntegrationTesting [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] @@ -127,5 +126,11 @@ namespace Grpc.IntegrationTesting { await InteropClient.RunCancelAfterFirstResponseAsync(client); } + + [Test] + public async Task TimeoutOnSleepingServerAsync() + { + await InteropClient.RunTimeoutOnSleepingServerAsync(client); + } } } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs index 504fd11857..0cc8b2cde1 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs @@ -107,8 +107,6 @@ namespace Grpc.IntegrationTesting server.Start(); server.ShutdownTask.Wait(); - - GrpcEnvironment.Shutdown(); } private static ServerOptions ParseArguments(string[] args) diff --git a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs index 1c398eb84e..842795374f 100644 --- a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs @@ -85,9 +85,8 @@ namespace Grpc.IntegrationTesting [TestFixtureTearDown] public void Cleanup() { - channel.Dispose(); + channel.ShutdownAsync().Wait(); server.ShutdownAsync().Wait(); - GrpcEnvironment.Shutdown(); } [Test] diff --git a/src/node/README.md b/src/node/README.md index 7d3d8c7fa1..b6411537c7 100644 --- a/src/node/README.md +++ b/src/node/README.md @@ -5,11 +5,35 @@ Alpha : Ready for early adopters ## PREREQUISITES - `node`: This requires `node` to be installed. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package. -- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core. +- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core. ## INSTALLATION -On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. -Run the following command to install gRPC Node.js. + +**Linux (Debian):** + +Add [Debian unstable][] to your `sources.list` file. Example: + +```sh +echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \ +sudo tee -a /etc/apt/sources.list +``` + +Install the gRPC Debian package + +```sh +sudo apt-get update +sudo apt-get install libgrpc-dev +``` + +Install the gRPC NPM package + +```sh +npm install grpc +``` + +**Mac OS X** + +Install [homebrew][]. Run the following command to install gRPC Node.js. ```sh $ curl -fsSL https://goo.gl/getgrpc | bash -s nodejs ``` @@ -88,5 +112,5 @@ ServerCredentials An object with factory methods for creating credential objects for servers. [homebrew]:http://brew.sh -[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install +[Debian unstable]:https://www.debian.org/releases/sid/ diff --git a/src/node/health_check/health.js b/src/node/health_check/health.js index 87e00197fe..84d7e0568e 100644 --- a/src/node/health_check/health.js +++ b/src/node/health_check/health.js @@ -45,17 +45,13 @@ function HealthImplementation(statusMap) { this.statusMap = _.clone(statusMap); } -HealthImplementation.prototype.setStatus = function(host, service, status) { - if (!this.statusMap[host]) { - this.statusMap[host] = {}; - } - this.statusMap[host][service] = status; +HealthImplementation.prototype.setStatus = function(service, status) { + this.statusMap[service] = status; }; HealthImplementation.prototype.check = function(call, callback){ - var host = call.request.host; var service = call.request.service; - var status = _.get(this.statusMap, [host, service], null); + var status = _.get(this.statusMap, service, null); if (status === null) { callback({code:grpc.status.NOT_FOUND}); } else { diff --git a/src/node/health_check/health.proto b/src/node/health_check/health.proto index d31df1e0a7..57f4aaa9c0 100644 --- a/src/node/health_check/health.proto +++ b/src/node/health_check/health.proto @@ -32,8 +32,7 @@ syntax = "proto3"; package grpc.health.v1alpha; message HealthCheckRequest { - string host = 1; - string service = 2; + string service = 1; } message HealthCheckResponse { @@ -47,4 +46,4 @@ message HealthCheckResponse { service Health { rpc Check(HealthCheckRequest) returns (HealthCheckResponse); -}
\ No newline at end of file +} diff --git a/src/node/test/health_test.js b/src/node/test/health_test.js index be4ef1d251..04959f5f55 100644 --- a/src/node/test/health_test.js +++ b/src/node/test/health_test.js @@ -41,13 +41,9 @@ var grpc = require('../'); describe('Health Checking', function() { var statusMap = { - '': { - '': 'SERVING', - 'grpc.test.TestService': 'NOT_SERVING', - }, - virtual_host: { - 'grpc.test.TestService': 'SERVING' - } + '': 'SERVING', + 'grpc.test.TestServiceNotServing': 'NOT_SERVING', + 'grpc.test.TestServiceServing': 'SERVING' }; var healthServer = new grpc.Server(); healthServer.addProtoService(health.service, @@ -71,15 +67,15 @@ describe('Health Checking', function() { }); }); it('should say that a disabled service is NOT_SERVING', function(done) { - healthClient.check({service: 'grpc.test.TestService'}, + healthClient.check({service: 'grpc.test.TestServiceNotServing'}, function(err, response) { assert.ifError(err); assert.strictEqual(response.status, 'NOT_SERVING'); done(); }); }); - it('should say that a service on another host is SERVING', function(done) { - healthClient.check({host: 'virtual_host', service: 'grpc.test.TestService'}, + it('should say that an enabled service is SERVING', function(done) { + healthClient.check({service: 'grpc.test.TestServiceServing'}, function(err, response) { assert.ifError(err); assert.strictEqual(response.status, 'SERVING'); @@ -93,12 +89,4 @@ describe('Health Checking', function() { done(); }); }); - it('should get NOT_FOUND if the host is not registered', function(done) { - healthClient.check({host: 'wrong_host', service: 'grpc.test.TestService'}, - function(err, response) { - assert(err); - assert.strictEqual(err.code, grpc.status.NOT_FOUND); - done(); - }); - }); }); diff --git a/src/php/README.md b/src/php/README.md index 1804606e09..f432935fde 100644 --- a/src/php/README.md +++ b/src/php/README.md @@ -7,17 +7,17 @@ This directory contains source code for PHP implementation of gRPC layered on sh Alpha : Ready for early adopters -## ENVIRONMENT +## Environment Prerequisite: PHP 5.5 or later, `phpunit`, `pecl` -Linux: +**Linux:** ```sh $ sudo apt-get install php5 php5-dev phpunit php-pear ``` -OS X: +**Mac OS X:** ```sh $ curl https://phar.phpunit.de/phpunit.phar -o phpunit.phar @@ -28,10 +28,39 @@ $ curl -O http://pear.php.net/go-pear.phar $ sudo php -d detect_unicode=0 go-pear.phar ``` -## Build from Homebrew +## Quick Install -On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. Run the following command to -install gRPC. +**Linux (Debian):** + +Add [Debian unstable][] to your `sources.list` file. Example: + +```sh +echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \ +sudo tee -a /etc/apt/sources.list +``` + +Install the gRPC Debian package + +```sh +sudo apt-get update +sudo apt-get install libgrpc-dev +``` + +Install the gRPC PHP extension + +```sh +sudo pecl install grpc-alpha +``` + +**Mac OS X:** + +Install [homebrew][]. Example: + +```sh +ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" +``` + +Install the gRPC core library and the PHP extension in one step ```sh $ curl -fsSL https://goo.gl/getgrpc | bash -s php @@ -39,6 +68,7 @@ $ curl -fsSL https://goo.gl/getgrpc | bash -s php This will download and run the [gRPC install script][] and compile the gRPC PHP extension. + ## Build from Source Clone this repository @@ -71,7 +101,7 @@ $ sudo make install Install the gRPC PHP extension ```sh -$ sudo pecl install grpc +$ sudo pecl install grpc-alpha ``` OR @@ -140,6 +170,6 @@ $ ./bin/run_gen_code_test.sh ``` [homebrew]:http://brew.sh -[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install [Node]:https://github.com/grpc/grpc/tree/master/src/node/examples +[Debian unstable]:https://www.debian.org/releases/sid/ diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 4e40dc43ce..252623d0c3 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -216,13 +216,18 @@ PHP_METHOD(Call, __construct) { char *method; int method_len; zval *deadline_obj; - /* "OsO" == 1 Object, 1 string, 1 Object */ - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO", &channel_obj, - grpc_ce_channel, &method, &method_len, - &deadline_obj, grpc_ce_timeval) == FAILURE) { + char *host_override = NULL; + int host_override_len = 0; + /* "OsO|s" == 1 Object, 1 string, 1 Object, 1 optional string */ + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO|s", + &channel_obj, grpc_ce_channel, + &method, &method_len, + &deadline_obj, grpc_ce_timeval, + &host_override, &host_override_len) + == FAILURE) { zend_throw_exception( spl_ce_InvalidArgumentException, - "Call expects a Channel, a String, and a Timeval", + "Call expects a Channel, a String, a Timeval and an optional String", 1 TSRMLS_CC); return; } @@ -241,7 +246,7 @@ PHP_METHOD(Call, __construct) { deadline_obj TSRMLS_CC); call->wrapped = grpc_channel_create_call( channel->wrapped, NULL, GRPC_PROPAGATE_DEFAULTS, completion_queue, method, - channel->target, deadline->wrapped, NULL); + host_override, deadline->wrapped, NULL); } /** @@ -273,7 +278,6 @@ PHP_METHOD(Call, startBatch) { grpc_byte_buffer *message; int cancelled; grpc_call_error error; - grpc_event event; zval *result; char *message_str; size_t message_len; @@ -409,14 +413,8 @@ PHP_METHOD(Call, startBatch) { (long)error TSRMLS_CC); goto cleanup; } - event = grpc_completion_queue_pluck(completion_queue, call->wrapped, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); - if (!event.success) { - zend_throw_exception(spl_ce_LogicException, - "The batch failed for some reason", - 1 TSRMLS_CC); - goto cleanup; - } + grpc_completion_queue_pluck(completion_queue, call->wrapped, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); for (int i = 0; i < op_num; i++) { switch(ops[i].op) { case GRPC_OP_SEND_INITIAL_METADATA: diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c index f8ce04d902..7a981675de 100644 --- a/src/php/ext/grpc/channel.c +++ b/src/php/ext/grpc/channel.c @@ -64,7 +64,6 @@ void free_wrapped_grpc_channel(void *object TSRMLS_DC) { if (channel->wrapped != NULL) { grpc_channel_destroy(channel->wrapped); } - efree(channel->target); efree(channel); } @@ -141,9 +140,6 @@ PHP_METHOD(Channel, __construct) { HashTable *array_hash; zval **creds_obj = NULL; wrapped_grpc_credentials *creds = NULL; - zval **override_obj; - char *override; - int override_len; /* "s|a" == 1 string, 1 optional array */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|a", &target, &target_length, &args_array) == FAILURE) { @@ -151,8 +147,6 @@ PHP_METHOD(Channel, __construct) { "Channel expects a string and an array", 1 TSRMLS_CC); return; } - override = target; - override_len = target_length; if (args_array == NULL) { channel->wrapped = grpc_insecure_channel_create(target, NULL, NULL); } else { @@ -169,19 +163,6 @@ PHP_METHOD(Channel, __construct) { *creds_obj TSRMLS_CC); zend_hash_del(array_hash, "credentials", 12); } - if (zend_hash_find(array_hash, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG, - sizeof(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG), - (void **)&override_obj) == SUCCESS) { - if (Z_TYPE_PP(override_obj) != IS_STRING) { - zend_throw_exception(spl_ce_InvalidArgumentException, - GRPC_SSL_TARGET_NAME_OVERRIDE_ARG - " must be a string", - 1 TSRMLS_CC); - return; - } - override = Z_STRVAL_PP(override_obj); - override_len = Z_STRLEN_PP(override_obj); - } php_grpc_read_args_array(args_array, &args); if (creds == NULL) { channel->wrapped = grpc_insecure_channel_create(target, &args, NULL); @@ -192,8 +173,6 @@ PHP_METHOD(Channel, __construct) { } efree(args.args); } - channel->target = ecalloc(override_len + 1, sizeof(char)); - memcpy(channel->target, override, override_len); } /** diff --git a/src/php/ext/grpc/channel.h b/src/php/ext/grpc/channel.h index c13fa4c6d7..78a16ed0c9 100755 --- a/src/php/ext/grpc/channel.h +++ b/src/php/ext/grpc/channel.h @@ -53,7 +53,6 @@ typedef struct wrapped_grpc_channel { zend_object std; grpc_channel *wrapped; - char *target; } wrapped_grpc_channel; /* Initializes the Channel class */ diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php index 8b7e67f57c..287621d930 100644 --- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php +++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php @@ -39,6 +39,14 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase { protected static $client; protected static $timeout; + public function testWaitForNotReady() { + $this->assertFalse(self::$client->waitForReady(1)); + } + + public function testWaitForReady() { + $this->assertTrue(self::$client->waitForReady(250000)); + } + public function testSimpleRequest() { $div_arg = new math\DivArgs(); $div_arg->setDividend(7); diff --git a/src/php/tests/generated_code/GeneratedCodeTest.php b/src/php/tests/generated_code/GeneratedCodeTest.php index 1e4742fc80..a1a2ce81db 100755 --- a/src/php/tests/generated_code/GeneratedCodeTest.php +++ b/src/php/tests/generated_code/GeneratedCodeTest.php @@ -35,7 +35,7 @@ require 'AbstractGeneratedCodeTest.php'; class GeneratedCodeTest extends AbstractGeneratedCodeTest { public static function setUpBeforeClass() { - self::$client = new math\MathClient(new Grpc\BaseStub( - getenv('GRPC_TEST_HOST'), [])); + self::$client = new math\MathClient( + getenv('GRPC_TEST_HOST'), []); } } diff --git a/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php b/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php index f8ec1e7da8..68f57d34ad 100644 --- a/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php +++ b/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php @@ -35,13 +35,13 @@ require 'AbstractGeneratedCodeTest.php'; class GeneratedCodeWithCallbackTest extends AbstractGeneratedCodeTest { public static function setUpBeforeClass() { - self::$client = new math\MathClient(new Grpc\BaseStub( + self::$client = new math\MathClient( getenv('GRPC_TEST_HOST'), ['update_metadata' => function($a_hash, $client = array()) { $a_copy = $a_hash; $a_copy['foo'] = ['bar']; return $a_copy; - }])); + }]); } } diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index 44e6242c29..376d306da0 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -271,7 +271,7 @@ function cancelAfterFirstResponse($stub) { } function timeoutOnSleepingServer($stub) { - $call = $stub->FullDuplexCall(array('timeout' => 500000)); + $call = $stub->FullDuplexCall(array('timeout' => 1000)); $request = new grpc\testing\StreamingOutputCallRequest(); $request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE); $response_parameters = new grpc\testing\ResponseParameters(); diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php index f91c006da5..60341b983d 100755 --- a/src/php/tests/unit_tests/SecureEndToEndTest.php +++ b/src/php/tests/unit_tests/SecureEndToEndTest.php @@ -40,13 +40,15 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ file_get_contents(dirname(__FILE__) . '/../data/server1.key'), file_get_contents(dirname(__FILE__) . '/../data/server1.pem')); $this->server = new Grpc\Server(); - $port = $this->server->addSecureHttp2Port('0.0.0.0:0', + $this->port = $this->server->addSecureHttp2Port('0.0.0.0:0', $server_credentials); $this->server->start(); + $this->host_override = 'foo.test.google.fr'; $this->channel = new Grpc\Channel( - 'localhost:' . $port, + 'localhost:' . $this->port, [ - 'grpc.ssl_target_name_override' => 'foo.test.google.fr', + 'grpc.ssl_target_name_override' => $this->host_override, + 'grpc.default_authority' => $this->host_override, 'credentials' => $credentials ]); } @@ -61,7 +63,8 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $status_text = 'xyz'; $call = new Grpc\Call($this->channel, 'dummy_method', - $deadline); + $deadline, + $this->host_override); $event = $call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], @@ -112,7 +115,8 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{ $call = new Grpc\Call($this->channel, 'dummy_method', - $deadline); + $deadline, + $this->host_override); $event = $call->startBatch([ Grpc\OP_SEND_INITIAL_METADATA => [], diff --git a/src/python/README.md b/src/python/README.md index 2beb3a913a..de0142db05 100644 --- a/src/python/README.md +++ b/src/python/README.md @@ -9,12 +9,36 @@ Alpha : Ready for early adopters PREREQUISITES ------------- - Python 2.7, virtualenv, pip -- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core. +- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core. INSTALLATION ------------- -On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. -Run the following command to install gRPC Python. + +**Linux (Debian):** + +Add [Debian unstable][] to your `sources.list` file. Example: + +```sh +echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \ +sudo tee -a /etc/apt/sources.list +``` + +Install the gRPC Debian package + +```sh +sudo apt-get update +sudo apt-get install libgrpc-dev +``` + +Install the gRPC Python module + +```sh +sudo pip install grpcio +``` + +**Mac OS X** + +Install [homebrew][]. Run the following command to install gRPC Python. ```sh $ curl -fsSL https://goo.gl/getgrpc | bash -s python ``` @@ -27,11 +51,6 @@ Please read our online documentation for a [Quick Start][] and a [detailed examp BUILDING FROM SOURCE --------------------- - Clone this repository -- Build the gRPC core from the root of the - [gRPC Git repository](https://github.com/grpc/grpc) -``` -$ make shared_c static_c -``` - Use build_python.sh to build the Python code and install it into a virtual environment ``` @@ -60,7 +79,7 @@ $ ../../tools/distrib/python/submit.py ``` [homebrew]:http://brew.sh -[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install [Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html [detailed example]:http://www.grpc.io/docs/installation/python.html +[Debian unstable]:https://www.debian.org/releases/sid/ diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst index 00bdecf56f..c7b5a3bde4 100644 --- a/src/python/grpcio/README.rst +++ b/src/python/grpcio/README.rst @@ -6,7 +6,7 @@ Package for GRPC Python. Dependencies ------------ -Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_. +Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. Run the following command to install gRPC Python. :: @@ -19,5 +19,4 @@ Otherwise, `install from source`_ .. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source .. _homebrew: http://brew.sh -.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation .. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install diff --git a/src/ruby/README.md b/src/ruby/README.md index 4b657c0bd4..f8902e34c5 100644 --- a/src/ruby/README.md +++ b/src/ruby/README.md @@ -12,12 +12,36 @@ PREREQUISITES ------------- - Ruby 2.x. The gRPC API uses keyword args. -- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core. +- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core. INSTALLATION --------------- -On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. -Run the following command to install gRPC Ruby. + +**Linux (Debian):** + +Add [Debian unstable][] to your `sources.list` file. Example: + +```sh +echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \ +sudo tee -a /etc/apt/sources.list +``` + +Install the gRPC Debian package + +```sh +sudo apt-get update +sudo apt-get install libgrpc-dev +``` + +Install the gRPC Ruby package + +```sh +gem install grpc +``` + +**Mac OS X** + +Install [homebrew][]. Run the following command to install gRPC Ruby. ```sh $ curl -fsSL https://goo.gl/getgrpc | bash -s ruby ``` @@ -26,12 +50,6 @@ This will download and run the [gRPC install script][], then install the latest BUILD FROM SOURCE --------------------- - Clone this repository -- Build the gRPC C core -E.g, from the root of the gRPC [Git repository](https://github.com/google/grpc) -```sh -$ cd ../.. -$ make && sudo make install -``` - Install Ruby 2.x. Consider doing this with [RVM](http://rvm.io), it's a nice way of controlling the exact ruby version that's used. @@ -77,8 +95,8 @@ Directory structure is the layout for [ruby extensions][] GRPC.logger.info("Answer: #{resp.inspect}") ``` [homebrew]:http://brew.sh -[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install [ruby extensions]:http://guides.rubygems.org/gems-with-extensions/ [rubydoc]: http://www.rubydoc.info/gems/grpc [grpc.io]: http://www.grpc.io/docs/installation/ruby.html +[Debian unstable]:https://www.debian.org/releases/sid/ diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 36c6818a7e..6b5beb6f5d 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -82,6 +82,10 @@ static ID id_metadata; * received by the call and subsequently saved on it. */ static ID id_status; +/* id_write_flag is name of the attribute used to access the write_flag + * saved on the call. */ +static ID id_write_flag; + /* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */ static VALUE sym_send_message; static VALUE sym_send_metadata; @@ -240,6 +244,30 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) { return rb_ivar_set(self, id_metadata, metadata); } +/* + call-seq: + write_flag = call.write_flag + + Gets the write_flag value saved the call. */ +static VALUE grpc_rb_call_get_write_flag(VALUE self) { + return rb_ivar_get(self, id_write_flag); +} + +/* + call-seq: + call.write_flag = write_flag + + Saves the write_flag on the call. */ +static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) { + if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) { + rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>", + rb_obj_classname(write_flag)); + return Qnil; + } + + return rb_ivar_set(self, id_write_flag, write_flag); +} + /* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used to fill grpc_metadata_array. @@ -437,17 +465,19 @@ typedef struct run_batch_stack { grpc_status_code recv_status; char *recv_status_details; size_t recv_status_details_capacity; + uint write_flag; } run_batch_stack; /* grpc_run_batch_stack_init ensures the run_batch_stack is properly * initialized */ -static void grpc_run_batch_stack_init(run_batch_stack *st) { +static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) { MEMZERO(st, run_batch_stack, 1); grpc_metadata_array_init(&st->send_metadata); grpc_metadata_array_init(&st->send_trailing_metadata); grpc_metadata_array_init(&st->recv_metadata); grpc_metadata_array_init(&st->recv_trailing_metadata); st->op_num = 0; + st->write_flag = write_flag; } /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly @@ -477,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) { this_op = rb_ary_entry(ops_ary, i); this_value = rb_hash_aref(ops_hash, this_op); + st->ops[st->op_num].flags = 0; switch (NUM2INT(this_op)) { case GRPC_OP_SEND_INITIAL_METADATA: /* N.B. later there is no need to explicitly delete the metadata keys @@ -490,6 +521,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { case GRPC_OP_SEND_MESSAGE: st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer( RSTRING_PTR(this_value), RSTRING_LEN(this_value)); + st->ops[st->op_num].flags = st->write_flag; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: break; @@ -525,7 +557,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) { NUM2INT(this_op)); }; st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op); - st->ops[st->op_num].flags = 0; st->ops[st->op_num].reserved = NULL; st->op_num++; } @@ -604,6 +635,8 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, grpc_event ev; grpc_call_error err; VALUE result = Qnil; + VALUE rb_write_flag = rb_ivar_get(self, id_write_flag); + uint write_flag = 0; TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); /* Validate the ops args, adding them to a ruby array */ @@ -611,7 +644,10 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash"); return Qnil; } - grpc_run_batch_stack_init(&st); + if (rb_write_flag != Qnil) { + write_flag = NUM2UINT(rb_write_flag); + } + grpc_run_batch_stack_init(&st, write_flag); grpc_run_batch_stack_fill_ops(&st, ops_hash); /* call grpc_call_start_batch, then wait for it to complete using @@ -638,6 +674,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, return result; } +static void Init_grpc_write_flags() { + /* Constants representing the write flags in grpc.h */ + VALUE grpc_rb_mWriteFlags = + rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags"); + rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT", + UINT2NUM(GRPC_WRITE_BUFFER_HINT)); + rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS", + UINT2NUM(GRPC_WRITE_NO_COMPRESS)); +} + static void Init_grpc_error_codes() { /* Constants representing the error codes of grpc_call_error in grpc.h */ VALUE grpc_rb_mRpcErrors = @@ -735,10 +781,14 @@ void Init_grpc_call() { rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1); rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0); rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1); + rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0); + rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag, + 1); /* Ids used to support call attributes */ id_metadata = rb_intern("metadata"); id_status = rb_intern("status"); + id_write_flag = rb_intern("write_flag"); /* Ids used by the c wrapping internals. */ id_cq = rb_intern("__cq"); @@ -766,6 +816,7 @@ void Init_grpc_call() { Init_grpc_error_codes(); Init_grpc_op_codes(); + Init_grpc_write_flags(); } /* Gets the call from the ruby object */ diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 17da401c6b..d9cb924735 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -59,7 +59,7 @@ module GRPC include Core::CallOps extend Forwardable attr_reader(:deadline) - def_delegators :@call, :cancel, :metadata + def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag= # client_invoke begins a client invocation. # @@ -484,6 +484,7 @@ module GRPC # Operation limits access to an ActiveCall's methods for use as # a Operation on the client. Operation = view_class(:cancel, :cancelled, :deadline, :execute, - :metadata, :status, :start_call, :wait) + :metadata, :status, :start_call, :wait, :write_flag, + :write_flag=) end end diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb index 3c5d33ffcd..dd3c45f754 100644 --- a/src/ruby/spec/call_spec.rb +++ b/src/ruby/spec/call_spec.rb @@ -31,6 +31,14 @@ require 'grpc' include GRPC::Core::StatusCodes +describe GRPC::Core::WriteFlags do + it 'should define the known write flag values' do + m = GRPC::Core::WriteFlags + expect(m.const_get(:BUFFER_HINT)).to_not be_nil + expect(m.const_get(:NO_COMPRESS)).to_not be_nil + end +end + describe GRPC::Core::RpcErrors do before(:each) do @known_types = { diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb index 26208b714a..fcd7bd082f 100644 --- a/src/ruby/spec/generic/active_call_spec.rb +++ b/src/ruby/spec/generic/active_call_spec.rb @@ -35,6 +35,7 @@ describe GRPC::ActiveCall do ActiveCall = GRPC::ActiveCall Call = GRPC::Core::Call CallOps = GRPC::Core::CallOps + WriteFlags = GRPC::Core::WriteFlags before(:each) do @pass_through = proc { |x| x } @@ -129,6 +130,31 @@ describe GRPC::ActiveCall do @pass_through, deadline) expect(server_call.remote_read).to eq('marshalled:' + msg) end + + TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS] + TEST_WRITE_FLAGS.each do |f| + it "successfully makes calls with write_flag set to #{f}" do + call = make_test_call + ActiveCall.client_invoke(call, @client_queue) + marshal = proc { |x| 'marshalled:' + x } + client_call = ActiveCall.new(call, @client_queue, marshal, + @pass_through, deadline) + msg = 'message is a string' + client_call.write_flag = f + client_call.remote_send(msg) + + # confirm that the message was marshalled + recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline) + recvd_call = recvd_rpc.call + server_ops = { + CallOps::SEND_INITIAL_METADATA => nil + } + recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops) + server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through, + @pass_through, deadline) + expect(server_call.remote_read).to eq('marshalled:' + msg) + end + end end describe '#client_invoke' do @@ -261,7 +287,7 @@ describe GRPC::ActiveCall do client_call.writes_done(false) server_call = expect_server_to_receive(msg) e = client_call.each_remote_read - n = 3 # arbitrary value > 1 + n = 3 # arbitrary value > 1 n.times do server_call.remote_send(reply) expect(e.next).to eq(reply) |