aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/compress_filter.c2
-rw-r--r--src/core/surface/server.c11
-rw-r--r--src/cpp/client/channel.cc2
-rw-r--r--src/cpp/server/server.cc37
-rw-r--r--src/csharp/Grpc.Core.Tests/ChannelTest.cs33
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs15
-rw-r--r--src/csharp/Grpc.Core.Tests/CompressionTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj2
-rw-r--r--src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs26
-rw-r--r--src/csharp/Grpc.Core.Tests/MetadataTest.cs120
-rw-r--r--src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/ServerTest.cs5
-rw-r--r--src/csharp/Grpc.Core.Tests/ShutdownTest.cs77
-rw-r--r--src/csharp/Grpc.Core.Tests/TimeoutsTest.cs8
-rw-r--r--src/csharp/Grpc.Core/Channel.cs50
-rw-r--r--src/csharp/Grpc.Core/ClientBase.cs3
-rw-r--r--src/csharp/Grpc.Core/ContextPropagationToken.cs1
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs37
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs16
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/DebugStats.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs4
-rw-r--r--src/csharp/Grpc.Core/Logging/ConsoleLogger.cs14
-rw-r--r--src/csharp/Grpc.Core/Metadata.cs112
-rw-r--r--src/csharp/Grpc.Core/Server.cs57
-rw-r--r--src/csharp/Grpc.Examples.MathClient/MathClient.cs20
-rw-r--r--src/csharp/Grpc.Examples.MathServer/MathServer.cs1
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs3
-rw-r--r--src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs3
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs40
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs9
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropServer.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs3
-rw-r--r--src/node/README.md32
-rw-r--r--src/node/ext/server_credentials.cc63
-rw-r--r--src/node/health_check/health.js10
-rw-r--r--src/node/health_check/health.proto5
-rw-r--r--src/node/interop/interop_server.js4
-rw-r--r--src/node/src/client.js8
-rw-r--r--src/node/test/health_test.js24
-rw-r--r--src/node/test/server_test.js4
-rw-r--r--src/php/README.md46
-rw-r--r--src/php/ext/grpc/call.c28
-rw-r--r--src/php/ext/grpc/channel.c21
-rwxr-xr-xsrc/php/ext/grpc/channel.h1
-rw-r--r--src/php/tests/generated_code/AbstractGeneratedCodeTest.php8
-rwxr-xr-xsrc/php/tests/generated_code/GeneratedCodeTest.php4
-rw-r--r--src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php4
-rwxr-xr-xsrc/php/tests/interop/interop_client.php2
-rwxr-xr-xsrc/php/tests/unit_tests/SecureEndToEndTest.php14
-rw-r--r--src/python/README.md37
-rw-r--r--src/python/grpcio/README.rst3
-rw-r--r--src/ruby/README.md38
-rw-r--r--src/ruby/ext/grpc/rb_call.c57
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb5
-rw-r--r--src/ruby/spec/call_spec.rb8
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb28
64 files changed, 930 insertions, 331 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 20d723bbc1..762a4edc73 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -216,7 +216,7 @@ static void process_send_ops(grpc_call_element *elem,
[calld->compression_algorithm]));
/* convey supported compression algorithms */
- grpc_metadata_batch_add_head(
+ grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->accept_encoding_storage,
GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 4990e6583a..1c402418e8 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -975,6 +975,11 @@ void grpc_server_setup_transport(grpc_server *s, grpc_transport *transport,
grpc_transport_perform_op(transport, &op);
}
+void done_published_shutdown(void *done_arg, grpc_cq_completion *storage) {
+ (void) done_arg;
+ gpr_free(storage);
+}
+
void grpc_server_shutdown_and_notify(grpc_server *server,
grpc_completion_queue *cq, void *tag) {
listener *l;
@@ -986,6 +991,12 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
/* lock, and gather up some stuff to do */
gpr_mu_lock(&server->mu_global);
grpc_cq_begin_op(cq);
+ if (server->shutdown_published) {
+ grpc_cq_end_op(cq, tag, 1, done_published_shutdown, NULL,
+ gpr_malloc(sizeof(grpc_cq_completion)));
+ gpr_mu_unlock(&server->mu_global);
+ return;
+ }
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 9695a0f14b..17f31c22cb 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -71,7 +71,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
} else {
const char* host_str = NULL;
if (!context->authority().empty()) {
- host_str = context->authority().c_str();
+ host_str = context->authority_.c_str();
} else if (!host_.empty()) {
host_str = host_.c_str();
}
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 05053ba5d5..1fe8f82dd0 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -136,6 +136,26 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
+ gpr_timespec deadline) {
+ void* tag = nullptr;
+ *ok = false;
+ switch (cq->AsyncNext(&tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ *req = nullptr;
+ return true;
+ case CompletionQueue::SHUTDOWN:
+ *req = nullptr;
+ return false;
+ case CompletionQueue::GOT_EVENT:
+ *req = static_cast<SyncRequest*>(tag);
+ GPR_ASSERT((*req)->in_flight_);
+ return true;
+ }
+ gpr_log(GPR_ERROR, "Should never reach here");
+ abort();
+ }
+
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -354,12 +374,27 @@ bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
return true;
}
-void Server::Shutdown() {
+void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
+ // Spin, eating requests until the completion queue is completely shutdown.
+ // If the deadline expires then cancel anything that's pending and keep
+ // spinning forever until the work is actually drained.
+ // Since nothing else needs to touch state guarded by mu_, holding it
+ // through this loop is fine.
+ SyncRequest* request;
+ bool ok;
+ while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
+ if (request == NULL) { // deadline expired
+ grpc_server_cancel_all_calls(server_);
+ deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ } else if (ok) {
+ SyncRequest::CallData call_data(this, request);
+ }
+ }
// Wait for running callbacks to finish.
while (num_running_cb_ != 0) {
diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
index 2787572924..dfbd92879e 100644
--- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
@@ -41,12 +41,6 @@ namespace Grpc.Core.Tests
{
public class ChannelTest
{
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public void Constructor_RejectsInvalidParams()
{
@@ -56,36 +50,33 @@ namespace Grpc.Core.Tests
[Test]
public void State_IdleAfterCreation()
{
- using (var channel = new Channel("localhost", Credentials.Insecure))
- {
- Assert.AreEqual(ChannelState.Idle, channel.State);
- }
+ var channel = new Channel("localhost", Credentials.Insecure);
+ Assert.AreEqual(ChannelState.Idle, channel.State);
+ channel.ShutdownAsync().Wait();
}
[Test]
public void WaitForStateChangedAsync_InvalidArgument()
{
- using (var channel = new Channel("localhost", Credentials.Insecure))
- {
- Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
- }
+ var channel = new Channel("localhost", Credentials.Insecure);
+ Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
+ channel.ShutdownAsync().Wait();
}
[Test]
public void ResolvedTarget()
{
- using (var channel = new Channel("127.0.0.1", Credentials.Insecure))
- {
- Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1"));
- }
+ var channel = new Channel("127.0.0.1", Credentials.Insecure);
+ Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1"));
+ channel.ShutdownAsync().Wait();
}
[Test]
- public void Dispose_IsIdempotent()
+ public void Shutdown_AllowedOnlyOnce()
{
var channel = new Channel("localhost", Credentials.Insecure);
- channel.Dispose();
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
+ Assert.Throws(typeof(InvalidOperationException), () => channel.ShutdownAsync().GetAwaiter().GetResult());
}
}
}
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index e49fdb5268..68279a2007 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -63,16 +63,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public async Task UnaryCall()
{
@@ -208,13 +202,6 @@ namespace Grpc.Core.Tests
}
[Test]
- public void UnaryCall_DisposedChannel()
- {
- channel.Dispose();
- Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
- }
-
- [Test]
public void UnaryCallPerformance()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
diff --git a/src/csharp/Grpc.Core.Tests/CompressionTest.cs b/src/csharp/Grpc.Core.Tests/CompressionTest.cs
index 9547683f60..378c81851c 100644
--- a/src/csharp/Grpc.Core.Tests/CompressionTest.cs
+++ b/src/csharp/Grpc.Core.Tests/CompressionTest.cs
@@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public void WriteOptions_Unary()
{
diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
index db5f953b0e..2db3f286f7 100644
--- a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
@@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public async Task PropagateCancellation()
{
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index d6a8f52570..ad4e94a695 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -64,6 +64,7 @@
<Link>Version.cs</Link>
</Compile>
<Compile Include="ClientBaseTest.cs" />
+ <Compile Include="ShutdownTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" />
@@ -82,6 +83,7 @@
<Compile Include="ResponseHeadersTest.cs" />
<Compile Include="CompressionTest.cs" />
<Compile Include="ContextPropagationTest.cs" />
+ <Compile Include="MetadataTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 4ed93c7eca..4fdfab5a99 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -43,34 +43,40 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAndShutdownGrpcEnvironment()
{
- var env = GrpcEnvironment.GetInstance();
+ var env = GrpcEnvironment.AddRef();
Assert.IsNotNull(env.CompletionQueue);
- GrpcEnvironment.Shutdown();
+ GrpcEnvironment.Release();
}
[Test]
public void SubsequentInvocations()
{
- var env1 = GrpcEnvironment.GetInstance();
- var env2 = GrpcEnvironment.GetInstance();
+ var env1 = GrpcEnvironment.AddRef();
+ var env2 = GrpcEnvironment.AddRef();
Assert.IsTrue(object.ReferenceEquals(env1, env2));
- GrpcEnvironment.Shutdown();
- GrpcEnvironment.Shutdown();
+ GrpcEnvironment.Release();
+ GrpcEnvironment.Release();
}
[Test]
public void InitializeAfterShutdown()
{
- var env1 = GrpcEnvironment.GetInstance();
- GrpcEnvironment.Shutdown();
+ var env1 = GrpcEnvironment.AddRef();
+ GrpcEnvironment.Release();
- var env2 = GrpcEnvironment.GetInstance();
- GrpcEnvironment.Shutdown();
+ var env2 = GrpcEnvironment.AddRef();
+ GrpcEnvironment.Release();
Assert.IsFalse(object.ReferenceEquals(env1, env2));
}
[Test]
+ public void ReleaseWithoutAddRef()
+ {
+ Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
+ }
+
+ [Test]
public void GetCoreVersionString()
{
var coreVersion = GrpcEnvironment.GetCoreVersionString();
diff --git a/src/csharp/Grpc.Core.Tests/MetadataTest.cs b/src/csharp/Grpc.Core.Tests/MetadataTest.cs
new file mode 100644
index 0000000000..c00f945d6a
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/MetadataTest.cs
@@ -0,0 +1,120 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class MetadataTest
+ {
+ [Test]
+ public void AsciiEntry()
+ {
+ var entry = new Metadata.Entry("ABC", "XYZ");
+ Assert.IsFalse(entry.IsBinary);
+ Assert.AreEqual("abc", entry.Key); // key is in lowercase.
+ Assert.AreEqual("XYZ", entry.Value);
+ CollectionAssert.AreEqual(new[] { (byte)'X', (byte)'Y', (byte)'Z' }, entry.ValueBytes);
+
+ Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc-bin", "xyz"));
+
+ Assert.AreEqual("[Entry: key=abc, value=XYZ]", entry.ToString());
+ }
+
+ [Test]
+ public void BinaryEntry()
+ {
+ var bytes = new byte[] { 1, 2, 3 };
+ var entry = new Metadata.Entry("ABC-BIN", bytes);
+ Assert.IsTrue(entry.IsBinary);
+ Assert.AreEqual("abc-bin", entry.Key); // key is in lowercase.
+ Assert.Throws(typeof(InvalidOperationException), () => { var v = entry.Value; });
+ CollectionAssert.AreEqual(bytes, entry.ValueBytes);
+
+ Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc", bytes));
+
+ Assert.AreEqual("[Entry: key=abc-bin, valueBytes=System.Byte[]]", entry.ToString());
+ }
+
+ [Test]
+ public void Entry_ConstructionPreconditions()
+ {
+ Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry(null, "xyz"));
+ Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry("abc", (string)null));
+ Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry("abc-bin", (byte[])null));
+ }
+
+ [Test]
+ public void Entry_Immutable()
+ {
+ var origBytes = new byte[] { 1, 2, 3 };
+ var bytes = new byte[] { 1, 2, 3 };
+ var entry = new Metadata.Entry("ABC-BIN", bytes);
+ bytes[0] = 255; // changing the array passed to constructor should have any effect.
+ CollectionAssert.AreEqual(origBytes, entry.ValueBytes);
+
+ entry.ValueBytes[0] = 255;
+ CollectionAssert.AreEqual(origBytes, entry.ValueBytes);
+ }
+
+ [Test]
+ public void Entry_CreateUnsafe_Ascii()
+ {
+ var bytes = new byte[] { (byte)'X', (byte)'y' };
+ var entry = Metadata.Entry.CreateUnsafe("abc", bytes);
+ Assert.IsFalse(entry.IsBinary);
+ Assert.AreEqual("abc", entry.Key);
+ Assert.AreEqual("Xy", entry.Value);
+ CollectionAssert.AreEqual(bytes, entry.ValueBytes);
+ }
+
+ [Test]
+ public void Entry_CreateUnsafe_Binary()
+ {
+ var bytes = new byte[] { 1, 2, 3 };
+ var entry = Metadata.Entry.CreateUnsafe("abc-bin", bytes);
+ Assert.IsTrue(entry.IsBinary);
+ Assert.AreEqual("abc-bin", entry.Key);
+ Assert.Throws(typeof(InvalidOperationException), () => { var v = entry.Value; });
+ CollectionAssert.AreEqual(bytes, entry.ValueBytes);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
index 981b8ea3c8..706006702e 100644
--- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
@@ -69,16 +69,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public void WriteResponseHeaders_NullNotAllowed()
{
diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs
index 485006ebac..e7193c843b 100644
--- a/src/csharp/Grpc.Core.Tests/ServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs
@@ -51,7 +51,6 @@ namespace Grpc.Core.Tests
};
server.Start();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
@@ -67,8 +66,7 @@ namespace Grpc.Core.Tests
Assert.Greater(boundPort.BoundPort, 0);
server.Start();
- server.ShutdownAsync();
- GrpcEnvironment.Shutdown();
+ server.ShutdownAsync().Wait();
}
[Test]
@@ -83,7 +81,6 @@ namespace Grpc.Core.Tests
Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build()));
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
}
}
diff --git a/src/csharp/Grpc.Core.Tests/ShutdownTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs
new file mode 100644
index 0000000000..a2be7ddd5e
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs
@@ -0,0 +1,77 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class ShutdownTest
+ {
+ const string Host = "127.0.0.1";
+
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper(Host);
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [Test]
+ public async Task AbandonedCall()
+ {
+ helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ await requestStream.ToListAsync();
+ });
+
+ var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddMilliseconds(1))));
+
+ channel.ShutdownAsync().Wait();
+ server.ShutdownAsync().Wait();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
index d875d601b9..41f661f62d 100644
--- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
+++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
@@ -65,16 +65,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public void InfiniteDeadline()
{
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 64c6adf2bf..2f8519dfa3 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -45,14 +45,19 @@ namespace Grpc.Core
/// <summary>
/// gRPC Channel
/// </summary>
- public class Channel : IDisposable
+ public class Channel
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
+ readonly object myLock = new object();
+ readonly AtomicCounter activeCallCounter = new AtomicCounter();
+
readonly string target;
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options;
+
+ bool shutdownRequested;
bool disposed;
/// <summary>
@@ -65,7 +70,7 @@ namespace Grpc.Core
public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null)
{
this.target = Preconditions.CheckNotNull(target, "target");
- this.environment = GrpcEnvironment.GetInstance();
+ this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
EnsureUserAgentChannelOption(this.options);
@@ -172,12 +177,26 @@ namespace Grpc.Core
}
/// <summary>
- /// Destroys the underlying channel.
+ /// Waits until there are no more active calls for this channel and then cleans up
+ /// resources used by this channel.
/// </summary>
- public void Dispose()
+ public async Task ShutdownAsync()
{
- Dispose(true);
- GC.SuppressFinalize(this);
+ lock (myLock)
+ {
+ Preconditions.CheckState(!shutdownRequested);
+ shutdownRequested = true;
+ }
+
+ var activeCallCount = activeCallCounter.Count;
+ if (activeCallCount > 0)
+ {
+ Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount);
+ }
+
+ handle.Dispose();
+
+ await Task.Run(() => GrpcEnvironment.Release());
}
internal ChannelSafeHandle Handle
@@ -196,13 +215,20 @@ namespace Grpc.Core
}
}
- protected virtual void Dispose(bool disposing)
+ internal void AddCallReference(object call)
{
- if (disposing && handle != null && !disposed)
- {
- disposed = true;
- handle.Dispose();
- }
+ activeCallCounter.Increment();
+
+ bool success = false;
+ handle.DangerousAddRef(ref success);
+ Preconditions.CheckState(success);
+ }
+
+ internal void RemoveCallReference(object call)
+ {
+ handle.DangerousRelease();
+
+ activeCallCounter.Decrement();
}
private static void EnsureUserAgentChannelOption(List<ChannelOption> options)
diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs
index 7bc100ca60..903449439b 100644
--- a/src/csharp/Grpc.Core/ClientBase.cs
+++ b/src/csharp/Grpc.Core/ClientBase.cs
@@ -119,7 +119,8 @@ namespace Grpc.Core
internal static string GetAuthUriBase(string target)
{
var match = ChannelTargetPattern.Match(target);
- if (!match.Success) {
+ if (!match.Success)
+ {
return null;
}
return "https://" + match.Groups[2].Value + "/";
diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs
index 2e4bfc9e47..a5bf1b5a70 100644
--- a/src/csharp/Grpc.Core/ContextPropagationToken.cs
+++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs
@@ -132,7 +132,6 @@ namespace Grpc.Core
bool propagateDeadline;
bool propagateCancellation;
-
/// <summary>
/// Creates new context propagation options.
/// </summary>
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 30d8c80235..0a44eead74 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -58,6 +58,7 @@ namespace Grpc.Core
static object staticLock = new object();
static GrpcEnvironment instance;
+ static int refCount;
static ILogger logger = new ConsoleLogger();
@@ -67,13 +68,14 @@ namespace Grpc.Core
bool isClosed;
/// <summary>
- /// Returns an instance of initialized gRPC environment.
- /// Subsequent invocations return the same instance unless Shutdown has been called first.
+ /// Returns a reference-counted instance of initialized gRPC environment.
+ /// Subsequent invocations return the same instance unless reference count has dropped to zero previously.
/// </summary>
- internal static GrpcEnvironment GetInstance()
+ internal static GrpcEnvironment AddRef()
{
lock (staticLock)
{
+ refCount++;
if (instance == null)
{
instance = new GrpcEnvironment();
@@ -83,14 +85,16 @@ namespace Grpc.Core
}
/// <summary>
- /// Shuts down the gRPC environment if it was initialized before.
- /// Blocks until the environment has been fully shutdown.
+ /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
+ /// (and blocks until the environment has been fully shutdown).
/// </summary>
- public static void Shutdown()
+ internal static void Release()
{
lock (staticLock)
{
- if (instance != null)
+ Preconditions.CheckState(refCount > 0);
+ refCount--;
+ if (refCount == 0)
{
instance.Close();
instance = null;
@@ -125,12 +129,10 @@ namespace Grpc.Core
private GrpcEnvironment()
{
NativeLogRedirector.Redirect();
- grpcsharp_init();
+ GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
threadPool.Start();
- // TODO: use proper logging here
- Logger.Info("gRPC initialized.");
}
/// <summary>
@@ -175,6 +177,17 @@ namespace Grpc.Core
return Marshal.PtrToStringAnsi(ptr);
}
+
+ internal static void GrpcNativeInit()
+ {
+ grpcsharp_init();
+ }
+
+ internal static void GrpcNativeShutdown()
+ {
+ grpcsharp_shutdown();
+ }
+
/// <summary>
/// Shuts down this environment.
/// </summary>
@@ -185,12 +198,10 @@ namespace Grpc.Core
throw new InvalidOperationException("Close has already been called");
}
threadPool.Stop();
- grpcsharp_shutdown();
+ GrpcNativeShutdown();
isClosed = true;
debugStats.CheckOK();
-
- Logger.Info("gRPC shutdown.");
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 2c3e3d75ea..bb9ba5b8dd 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -311,9 +311,9 @@ namespace Grpc.Core.Internal
}
}
- protected override void OnReleaseResources()
+ protected override void OnAfterReleaseResources()
{
- details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.RemoveCallReference(this);
}
private void Initialize(CompletionQueueSafeHandle cq)
@@ -323,7 +323,9 @@ namespace Grpc.Core.Internal
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
- details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+
+ details.Channel.AddCallReference(this);
+
InitializeInternal(call);
RegisterCancellationCallback();
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 6ca4bbdafc..1808294f43 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -189,15 +189,15 @@ namespace Grpc.Core.Internal
private void ReleaseResources()
{
- OnReleaseResources();
if (call != null)
{
call.Dispose();
}
disposed = true;
+ OnAfterReleaseResources();
}
- protected virtual void OnReleaseResources()
+ protected virtual void OnAfterReleaseResources()
{
}
@@ -212,7 +212,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
- protected void CheckReadingAllowed()
+ protected virtual void CheckReadingAllowed()
{
Preconditions.CheckState(started);
Preconditions.CheckState(!disposed);
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 3710a65d6b..6278c0191e 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -50,16 +50,19 @@ namespace Grpc.Core.Internal
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly GrpcEnvironment environment;
+ readonly Server server;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer)
{
this.environment = Preconditions.CheckNotNull(environment);
+ this.server = Preconditions.CheckNotNull(server);
}
public void Initialize(CallSafeHandle call)
{
call.SetCompletionRegistry(environment.CompletionRegistry);
- environment.DebugStats.ActiveServerCalls.Increment();
+
+ server.AddCallReference(this);
InitializeInternal(call);
}
@@ -168,9 +171,15 @@ namespace Grpc.Core.Internal
}
}
- protected override void OnReleaseResources()
+ protected override void CheckReadingAllowed()
+ {
+ base.CheckReadingAllowed();
+ Preconditions.CheckArgument(!cancelRequested);
+ }
+
+ protected override void OnAfterReleaseResources()
{
- environment.DebugStats.ActiveServerCalls.Decrement();
+ server.RemoveCallReference(this);
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 6a2add54db..3a96414bea 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -134,7 +134,7 @@ namespace Grpc.Core.Internal
}
// Gets data of server_rpc_new completion.
- public ServerRpcNew GetServerRpcNew()
+ public ServerRpcNew GetServerRpcNew(Server server)
{
var call = grpcsharp_batch_context_server_rpc_new_call(this);
@@ -145,7 +145,7 @@ namespace Grpc.Core.Internal
IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
- return new ServerRpcNew(call, method, host, deadline, metadata);
+ return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
// Gets data of receive_close_on_server completion.
@@ -198,14 +198,16 @@ namespace Grpc.Core.Internal
/// </summary>
internal struct ServerRpcNew
{
+ readonly Server server;
readonly CallSafeHandle call;
readonly string method;
readonly string host;
readonly Timespec deadline;
readonly Metadata requestMetadata;
- public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
+ public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
{
+ this.server = server;
this.call = call;
this.method = method;
this.host = host;
@@ -213,6 +215,14 @@ namespace Grpc.Core.Internal
this.requestMetadata = requestMetadata;
}
+ public Server Server
+ {
+ get
+ {
+ return this.server;
+ }
+ }
+
public CallSafeHandle Call
{
get
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 7f03bf4ea5..8cef566c14 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -68,11 +68,17 @@ namespace Grpc.Core.Internal
public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs)
{
+ // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
+ // Doing so would make object finalizer crash if we end up abandoning the handle.
+ GrpcEnvironment.GrpcNativeInit();
return grpcsharp_insecure_channel_create(target, channelArgs);
}
public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs)
{
+ // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
+ // Doing so would make object finalizer crash if we end up abandoning the handle.
+ GrpcEnvironment.GrpcNativeInit();
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
@@ -107,6 +113,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);
+ GrpcEnvironment.GrpcNativeShutdown();
return true;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs
index 8793450ff3..1bea1adf9e 100644
--- a/src/csharp/Grpc.Core/Internal/DebugStats.cs
+++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs
@@ -38,10 +38,6 @@ namespace Grpc.Core.Internal
{
internal class DebugStats
{
- public readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
-
- public readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
-
public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
/// <summary>
@@ -49,16 +45,6 @@ namespace Grpc.Core.Internal
/// </summary>
public void CheckOK()
{
- var remainingClientCalls = ActiveClientCalls.Count;
- if (remainingClientCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
- }
- var remainingServerCalls = ActiveServerCalls.Count;
- if (remainingServerCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
- }
var pendingBatchCompletions = PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0)
{
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index cb4c7c821e..4b7124ee74 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -83,8 +83,6 @@ namespace Grpc.Core.Internal
lock (myLock)
{
cq.Shutdown();
-
- Logger.Info("Waiting for GRPC threads to finish.");
foreach (var thread in threads)
{
thread.Join();
@@ -136,7 +134,6 @@ namespace Grpc.Core.Internal
}
}
while (ev.type != GRPCCompletionType.Shutdown);
- Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
index 427c16fac6..83994f6762 100644
--- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
@@ -70,7 +70,8 @@ namespace Grpc.Core.Internal
var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
for (int i = 0; i < metadata.Count; i++)
{
- grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, metadata[i].ValueBytes, new UIntPtr((ulong)metadata[i].ValueBytes.Length));
+ var valueBytes = metadata[i].GetSerializedValueUnsafe();
+ grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
}
return metadataArray;
}
@@ -94,7 +95,7 @@ namespace Grpc.Core.Internal
string key = Marshal.PtrToStringAnsi(grpcsharp_metadata_array_get_key(metadataArray, index));
var bytes = new byte[grpcsharp_metadata_array_get_value_length(metadataArray, index).ToUInt64()];
Marshal.Copy(grpcsharp_metadata_array_get_value(metadataArray, index), bytes, 0, bytes.Length);
- metadata.Add(new Metadata.Entry(key, bytes));
+ metadata.Add(Metadata.Entry.CreateUnsafe(key, bytes));
}
return metadata;
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 688f9f6fec..59f4c5727c 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -67,7 +67,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -123,7 +123,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -179,7 +179,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -239,7 +239,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -278,7 +278,7 @@ namespace Grpc.Core.Internal
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
- (payload) => payload, (payload) => payload, environment);
+ (payload) => payload, (payload) => payload, environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index f9b44b1acf..5ee7ac14e8 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -74,6 +74,9 @@ namespace Grpc.Core.Internal
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
{
+ // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
+ // Doing so would make object finalizer crash if we end up abandoning the handle.
+ GrpcEnvironment.GrpcNativeInit();
return grpcsharp_server_create(cq, args);
}
@@ -109,6 +112,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle()
{
grpcsharp_server_destroy(handle);
+ GrpcEnvironment.GrpcNativeShutdown();
return true;
}
diff --git a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
index 382481d871..35561d25d8 100644
--- a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
+++ b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
@@ -51,7 +51,19 @@ namespace Grpc.Core.Logging
private ConsoleLogger(Type forType)
{
this.forType = forType;
- this.forTypeString = forType != null ? forType.FullName + " " : "";
+ if (forType != null)
+ {
+ var namespaceStr = forType.Namespace ?? "";
+ if (namespaceStr.Length > 0)
+ {
+ namespaceStr += ".";
+ }
+ this.forTypeString = namespaceStr + forType.Name + " ";
+ }
+ else
+ {
+ this.forTypeString = "";
+ }
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs
index 9db2abf46e..a589b50caa 100644
--- a/src/csharp/Grpc.Core/Metadata.cs
+++ b/src/csharp/Grpc.Core/Metadata.cs
@@ -46,6 +46,11 @@ namespace Grpc.Core
public sealed class Metadata : IList<Metadata.Entry>
{
/// <summary>
+ /// All binary headers should have this suffix.
+ /// </summary>
+ public const string BinaryHeaderSuffix = "-bin";
+
+ /// <summary>
/// An read-only instance of metadata containing no entries.
/// </summary>
public static readonly Metadata Empty = new Metadata().Freeze();
@@ -181,23 +186,49 @@ namespace Grpc.Core
private static readonly Encoding Encoding = Encoding.ASCII;
readonly string key;
- string value;
- byte[] valueBytes;
+ readonly string value;
+ readonly byte[] valueBytes;
+
+ private Entry(string key, string value, byte[] valueBytes)
+ {
+ this.key = key;
+ this.value = value;
+ this.valueBytes = valueBytes;
+ }
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct with a binary value.
+ /// </summary>
+ /// <param name="key">Metadata key, needs to have suffix indicating a binary valued metadata entry.</param>
+ /// <param name="valueBytes">Value bytes.</param>
public Entry(string key, byte[] valueBytes)
{
- this.key = Preconditions.CheckNotNull(key, "key");
+ this.key = NormalizeKey(key);
+ Preconditions.CheckArgument(this.key.EndsWith(BinaryHeaderSuffix),
+ "Key for binary valued metadata entry needs to have suffix indicating binary value.");
this.value = null;
- this.valueBytes = Preconditions.CheckNotNull(valueBytes, "valueBytes");
+ Preconditions.CheckNotNull(valueBytes, "valueBytes");
+ this.valueBytes = new byte[valueBytes.Length];
+ Buffer.BlockCopy(valueBytes, 0, this.valueBytes, 0, valueBytes.Length); // defensive copy to guarantee immutability
}
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct holding an ASCII value.
+ /// </summary>
+ /// <param name="key">Metadata key, must not use suffix indicating a binary valued metadata entry.</param>
+ /// <param name="value">Value string. Only ASCII characters are allowed.</param>
public Entry(string key, string value)
{
- this.key = Preconditions.CheckNotNull(key, "key");
+ this.key = NormalizeKey(key);
+ Preconditions.CheckArgument(!this.key.EndsWith(BinaryHeaderSuffix),
+ "Key for ASCII valued metadata entry cannot have suffix indicating binary value.");
this.value = Preconditions.CheckNotNull(value, "value");
this.valueBytes = null;
}
+ /// <summary>
+ /// Gets the metadata entry key.
+ /// </summary>
public string Key
{
get
@@ -206,33 +237,86 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the binary value of this metadata entry.
+ /// </summary>
public byte[] ValueBytes
{
get
{
if (valueBytes == null)
{
- valueBytes = Encoding.GetBytes(value);
+ return Encoding.GetBytes(value);
}
- return valueBytes;
+
+ // defensive copy to guarantee immutability
+ var bytes = new byte[valueBytes.Length];
+ Buffer.BlockCopy(valueBytes, 0, bytes, 0, valueBytes.Length);
+ return bytes;
}
}
+ /// <summary>
+ /// Gets the string value of this metadata entry.
+ /// </summary>
public string Value
{
get
{
- if (value == null)
- {
- value = Encoding.GetString(valueBytes);
- }
- return value;
+ Preconditions.CheckState(!IsBinary, "Cannot access string value of a binary metadata entry");
+ return value ?? Encoding.GetString(valueBytes);
}
}
-
+
+ /// <summary>
+ /// Returns <c>true</c> if this entry is a binary-value entry.
+ /// </summary>
+ public bool IsBinary
+ {
+ get
+ {
+ return value == null;
+ }
+ }
+
+ /// <summary>
+ /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Metadata+Entry"/>.
+ /// </summary>
public override string ToString()
{
- return string.Format("[Entry: key={0}, value={1}]", Key, Value);
+ if (IsBinary)
+ {
+ return string.Format("[Entry: key={0}, valueBytes={1}]", key, valueBytes);
+ }
+
+ return string.Format("[Entry: key={0}, value={1}]", key, value);
+ }
+
+ /// <summary>
+ /// Gets the serialized value for this entry. For binary metadata entries, this leaks
+ /// the internal <c>valueBytes</c> byte array and caller must not change contents of it.
+ /// </summary>
+ internal byte[] GetSerializedValueUnsafe()
+ {
+ return valueBytes ?? Encoding.GetBytes(value);
+ }
+
+ /// <summary>
+ /// Creates a binary value or ascii value metadata entry from data received from the native layer.
+ /// We trust C core to give us well-formed data, so we don't perform any checks or defensive copying.
+ /// </summary>
+ internal static Entry CreateUnsafe(string key, byte[] valueBytes)
+ {
+ if (key.EndsWith(BinaryHeaderSuffix))
+ {
+ return new Entry(key, null, valueBytes);
+ }
+ return new Entry(key, Encoding.GetString(valueBytes), null);
+ }
+
+ private static string NormalizeKey(string key)
+ {
+ return Preconditions.CheckNotNull(key, "key").ToLower();
}
}
}
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index c76f126026..28f1686e20 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -50,6 +50,8 @@ namespace Grpc.Core
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
+ readonly AtomicCounter activeCallCounter = new AtomicCounter();
+
readonly ServiceDefinitionCollection serviceDefinitions;
readonly ServerPortCollection ports;
readonly GrpcEnvironment environment;
@@ -73,7 +75,7 @@ namespace Grpc.Core
{
this.serviceDefinitions = new ServiceDefinitionCollection(this);
this.ports = new ServerPortCollection(this);
- this.environment = GrpcEnvironment.GetInstance();
+ this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
@@ -106,6 +108,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// To allow awaiting termination of the server.
+ /// </summary>
+ public Task ShutdownTask
+ {
+ get
+ {
+ return shutdownTcs.Task;
+ }
+ }
+
+ /// <summary>
/// Starts the server.
/// </summary>
public void Start()
@@ -136,18 +149,9 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment);
await shutdownTcs.Task;
- handle.Dispose();
- }
+ DisposeHandle();
- /// <summary>
- /// To allow awaiting termination of the server.
- /// </summary>
- public Task ShutdownTask
- {
- get
- {
- return shutdownTcs.Task;
- }
+ await Task.Run(() => GrpcEnvironment.Release());
}
/// <summary>
@@ -166,7 +170,22 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment);
handle.CancelAllCalls();
await shutdownTcs.Task;
- handle.Dispose();
+ DisposeHandle();
+ }
+
+ internal void AddCallReference(object call)
+ {
+ activeCallCounter.Increment();
+
+ bool success = false;
+ handle.DangerousAddRef(ref success);
+ Preconditions.CheckState(success);
+ }
+
+ internal void RemoveCallReference(object call)
+ {
+ handle.DangerousRelease();
+ activeCallCounter.Decrement();
}
/// <summary>
@@ -227,6 +246,16 @@ namespace Grpc.Core
}
}
+ private void DisposeHandle()
+ {
+ var activeCallCount = activeCallCounter.Count;
+ if (activeCallCount > 0)
+ {
+ Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount);
+ }
+ handle.Dispose();
+ }
+
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
@@ -254,7 +283,7 @@ namespace Grpc.Core
{
if (success)
{
- ServerRpcNew newRpc = ctx.GetServerRpcNew();
+ ServerRpcNew newRpc = ctx.GetServerRpcNew(this);
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
index f9839d99f1..abd95cb905 100644
--- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs
+++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
@@ -39,23 +39,21 @@ namespace math
{
public static void Main(string[] args)
{
- using (Channel channel = new Channel("127.0.0.1", 23456, Credentials.Insecure))
- {
- Math.IMathClient client = new Math.MathClient(channel);
- MathExamples.DivExample(client);
+ var channel = new Channel("127.0.0.1", 23456, Credentials.Insecure);
+ Math.IMathClient client = new Math.MathClient(channel);
+ MathExamples.DivExample(client);
- MathExamples.DivAsyncExample(client).Wait();
+ MathExamples.DivAsyncExample(client).Wait();
- MathExamples.FibExample(client).Wait();
+ MathExamples.FibExample(client).Wait();
- MathExamples.SumExample(client).Wait();
+ MathExamples.SumExample(client).Wait();
- MathExamples.DivManyExample(client).Wait();
+ MathExamples.DivManyExample(client).Wait();
- MathExamples.DependendRequestsExample(client).Wait();
- }
+ MathExamples.DependendRequestsExample(client).Wait();
- GrpcEnvironment.Shutdown();
+ channel.ShutdownAsync().Wait();
}
}
}
diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
index 5f7e717b0c..26bef646ec 100644
--- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs
+++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
@@ -56,7 +56,6 @@ namespace math
Console.ReadKey();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
}
}
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index fdef950f09..36c1c947bd 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -68,9 +68,8 @@ namespace math.Tests
[TestFixtureTearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
index 024377e216..80c35fb197 100644
--- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
+++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
@@ -71,10 +71,9 @@ namespace Grpc.HealthCheck.Tests
[TestFixtureTearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index f4b0a1028f..24c22273fb 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -37,13 +37,15 @@ using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
+using Google.Apis.Auth.OAuth2;
using Google.ProtocolBuffers;
+
using grpc.testing;
using Grpc.Auth;
using Grpc.Core;
using Grpc.Core.Utils;
+
using NUnit.Framework;
-using Google.Apis.Auth.OAuth2;
namespace Grpc.IntegrationTesting
{
@@ -118,12 +120,10 @@ namespace Grpc.IntegrationTesting
};
}
- using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions))
- {
- TestService.TestServiceClient client = new TestService.TestServiceClient(channel);
- await RunTestCaseAsync(options.testCase, client);
- }
- GrpcEnvironment.Shutdown();
+ var channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions);
+ TestService.TestServiceClient client = new TestService.TestServiceClient(channel);
+ await RunTestCaseAsync(options.testCase, client);
+ channel.ShutdownAsync().Wait();
}
private async Task RunTestCaseAsync(string testCase, TestService.TestServiceClient client)
@@ -169,6 +169,9 @@ namespace Grpc.IntegrationTesting
case "cancel_after_first_response":
await RunCancelAfterFirstResponseAsync(client);
break;
+ case "timeout_on_sleeping_server":
+ await RunTimeoutOnSleepingServerAsync(client);
+ break;
case "benchmark_empty_unary":
RunBenchmarkEmptyUnary(client);
break;
@@ -458,6 +461,29 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!");
}
+ public static async Task RunTimeoutOnSleepingServerAsync(TestService.ITestServiceClient client)
+ {
+ Console.WriteLine("running timeout_on_sleeping_server");
+
+ var deadline = DateTime.UtcNow.AddMilliseconds(1);
+ using (var call = client.FullDuplexCall(deadline: deadline))
+ {
+ try
+ {
+ await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
+ .SetPayload(CreateZerosPayload(27182)).Build());
+ }
+ catch (InvalidOperationException)
+ {
+ // Deadline was reached before write has started. Eat the exception and continue.
+ }
+
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.DeadlineExceeded, ex.Status.StatusCode);
+ }
+ Console.WriteLine("Passed!");
+ }
+
// This is not an official interop test, but it's useful.
public static void RunBenchmarkEmptyUnary(TestService.ITestServiceClient client)
{
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index 6fa721bc1c..f3158aeb45 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -75,9 +75,8 @@ namespace Grpc.IntegrationTesting
[TestFixtureTearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
@@ -127,5 +126,11 @@ namespace Grpc.IntegrationTesting
{
await InteropClient.RunCancelAfterFirstResponseAsync(client);
}
+
+ [Test]
+ public async Task TimeoutOnSleepingServerAsync()
+ {
+ await InteropClient.RunTimeoutOnSleepingServerAsync(client);
+ }
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
index 504fd11857..0cc8b2cde1 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
@@ -107,8 +107,6 @@ namespace Grpc.IntegrationTesting
server.Start();
server.ShutdownTask.Wait();
-
- GrpcEnvironment.Shutdown();
}
private static ServerOptions ParseArguments(string[] args)
diff --git a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
index 1c398eb84e..842795374f 100644
--- a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
@@ -85,9 +85,8 @@ namespace Grpc.IntegrationTesting
[TestFixtureTearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
diff --git a/src/node/README.md b/src/node/README.md
index 7d3d8c7fa1..b6411537c7 100644
--- a/src/node/README.md
+++ b/src/node/README.md
@@ -5,11 +5,35 @@ Alpha : Ready for early adopters
## PREREQUISITES
- `node`: This requires `node` to be installed. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package.
-- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core.
+- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
## INSTALLATION
-On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][].
-Run the following command to install gRPC Node.js.
+
+**Linux (Debian):**
+
+Add [Debian unstable][] to your `sources.list` file. Example:
+
+```sh
+echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
+sudo tee -a /etc/apt/sources.list
+```
+
+Install the gRPC Debian package
+
+```sh
+sudo apt-get update
+sudo apt-get install libgrpc-dev
+```
+
+Install the gRPC NPM package
+
+```sh
+npm install grpc
+```
+
+**Mac OS X**
+
+Install [homebrew][]. Run the following command to install gRPC Node.js.
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s nodejs
```
@@ -88,5 +112,5 @@ ServerCredentials
An object with factory methods for creating credential objects for servers.
[homebrew]:http://brew.sh
-[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
+[Debian unstable]:https://www.debian.org/releases/sid/
diff --git a/src/node/ext/server_credentials.cc b/src/node/ext/server_credentials.cc
index 1b8e7b43fb..6e17197e16 100644
--- a/src/node/ext/server_credentials.cc
+++ b/src/node/ext/server_credentials.cc
@@ -41,6 +41,7 @@
namespace grpc {
namespace node {
+using v8::Array;
using v8::Exception;
using v8::External;
using v8::Function;
@@ -52,6 +53,7 @@ using v8::Local;
using v8::Object;
using v8::ObjectTemplate;
using v8::Persistent;
+using v8::String;
using v8::Value;
NanCallback *ServerCredentials::constructor;
@@ -122,25 +124,66 @@ NAN_METHOD(ServerCredentials::CreateSsl) {
// TODO: have the node API support multiple key/cert pairs.
NanScope();
char *root_certs = NULL;
- grpc_ssl_pem_key_cert_pair key_cert_pair;
if (::node::Buffer::HasInstance(args[0])) {
root_certs = ::node::Buffer::Data(args[0]);
} else if (!(args[0]->IsNull() || args[0]->IsUndefined())) {
return NanThrowTypeError(
"createSSl's first argument must be a Buffer if provided");
}
- if (!::node::Buffer::HasInstance(args[1])) {
- return NanThrowTypeError("createSsl's second argument must be a Buffer");
+ if (!args[1]->IsArray()) {
+ return NanThrowTypeError(
+ "createSsl's second argument must be a list of objects");
+ }
+ int force_client_auth = 0;
+ if (args[2]->IsBoolean()) {
+ force_client_auth = (int)args[2]->BooleanValue();
+ } else if (!(args[2]->IsUndefined() || args[2]->IsNull())) {
+ return NanThrowTypeError(
+ "createSsl's third argument must be a boolean if provided");
}
- key_cert_pair.private_key = ::node::Buffer::Data(args[1]);
- if (!::node::Buffer::HasInstance(args[2])) {
- return NanThrowTypeError("createSsl's third argument must be a Buffer");
+ Handle<Array> pair_list = Local<Array>::Cast(args[1]);
+ uint32_t key_cert_pair_count = pair_list->Length();
+ grpc_ssl_pem_key_cert_pair *key_cert_pairs = new grpc_ssl_pem_key_cert_pair[
+ key_cert_pair_count];
+
+ Handle<String> key_key = NanNew("private_key");
+ Handle<String> cert_key = NanNew("cert_chain");
+
+ for(uint32_t i = 0; i < key_cert_pair_count; i++) {
+ if (!pair_list->Get(i)->IsObject()) {
+ delete key_cert_pairs;
+ return NanThrowTypeError("Key/cert pairs must be objects");
+ }
+ Handle<Object> pair_obj = pair_list->Get(i)->ToObject();
+ if (!pair_obj->HasOwnProperty(key_key)) {
+ delete key_cert_pairs;
+ return NanThrowTypeError(
+ "Key/cert pairs must have a private_key and a cert_chain");
+ }
+ if (!pair_obj->HasOwnProperty(cert_key)) {
+ delete key_cert_pairs;
+ return NanThrowTypeError(
+ "Key/cert pairs must have a private_key and a cert_chain");
+ }
+ if (!::node::Buffer::HasInstance(pair_obj->Get(key_key))) {
+ delete key_cert_pairs;
+ return NanThrowTypeError("private_key must be a Buffer");
+ }
+ if (!::node::Buffer::HasInstance(pair_obj->Get(cert_key))) {
+ delete key_cert_pairs;
+ return NanThrowTypeError("cert_chain must be a Buffer");
+ }
+ key_cert_pairs[i].private_key = ::node::Buffer::Data(
+ pair_obj->Get(key_key));
+ key_cert_pairs[i].cert_chain = ::node::Buffer::Data(
+ pair_obj->Get(cert_key));
}
- key_cert_pair.cert_chain = ::node::Buffer::Data(args[2]);
- // TODO Add a force_client_auth parameter and pass it as the last parameter
- // here.
grpc_server_credentials *creds =
- grpc_ssl_server_credentials_create(root_certs, &key_cert_pair, 1, 0);
+ grpc_ssl_server_credentials_create(root_certs,
+ key_cert_pairs,
+ key_cert_pair_count,
+ force_client_auth);
+ delete key_cert_pairs;
if (creds == NULL) {
NanReturnNull();
}
diff --git a/src/node/health_check/health.js b/src/node/health_check/health.js
index 87e00197fe..84d7e0568e 100644
--- a/src/node/health_check/health.js
+++ b/src/node/health_check/health.js
@@ -45,17 +45,13 @@ function HealthImplementation(statusMap) {
this.statusMap = _.clone(statusMap);
}
-HealthImplementation.prototype.setStatus = function(host, service, status) {
- if (!this.statusMap[host]) {
- this.statusMap[host] = {};
- }
- this.statusMap[host][service] = status;
+HealthImplementation.prototype.setStatus = function(service, status) {
+ this.statusMap[service] = status;
};
HealthImplementation.prototype.check = function(call, callback){
- var host = call.request.host;
var service = call.request.service;
- var status = _.get(this.statusMap, [host, service], null);
+ var status = _.get(this.statusMap, service, null);
if (status === null) {
callback({code:grpc.status.NOT_FOUND});
} else {
diff --git a/src/node/health_check/health.proto b/src/node/health_check/health.proto
index d31df1e0a7..57f4aaa9c0 100644
--- a/src/node/health_check/health.proto
+++ b/src/node/health_check/health.proto
@@ -32,8 +32,7 @@ syntax = "proto3";
package grpc.health.v1alpha;
message HealthCheckRequest {
- string host = 1;
- string service = 2;
+ string service = 1;
}
message HealthCheckResponse {
@@ -47,4 +46,4 @@ message HealthCheckResponse {
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
-} \ No newline at end of file
+}
diff --git a/src/node/interop/interop_server.js b/src/node/interop/interop_server.js
index 1242a0f939..99155e9958 100644
--- a/src/node/interop/interop_server.js
+++ b/src/node/interop/interop_server.js
@@ -169,8 +169,8 @@ function getServer(port, tls) {
var key_data = fs.readFileSync(key_path);
var pem_data = fs.readFileSync(pem_path);
server_creds = grpc.ServerCredentials.createSsl(null,
- key_data,
- pem_data);
+ [{private_key: key_data,
+ cert_chain: pem_data}]);
} else {
server_creds = grpc.ServerCredentials.createInsecure();
}
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 48fe0dd3b7..7b7eae51d2 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -280,7 +280,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
}
var client_batch = {};
var message = serialize(argument);
- message.grpcWriteFlags = options.flags;
+ if (options) {
+ message.grpcWriteFlags = options.flags;
+ }
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
@@ -416,7 +418,9 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
}
var start_batch = {};
var message = serialize(argument);
- message.grpcWriteFlags = options.flags;
+ if (options) {
+ message.grpcWriteFlags = options.flags;
+ }
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
start_batch[grpc.opType.SEND_MESSAGE] = message;
diff --git a/src/node/test/health_test.js b/src/node/test/health_test.js
index be4ef1d251..04959f5f55 100644
--- a/src/node/test/health_test.js
+++ b/src/node/test/health_test.js
@@ -41,13 +41,9 @@ var grpc = require('../');
describe('Health Checking', function() {
var statusMap = {
- '': {
- '': 'SERVING',
- 'grpc.test.TestService': 'NOT_SERVING',
- },
- virtual_host: {
- 'grpc.test.TestService': 'SERVING'
- }
+ '': 'SERVING',
+ 'grpc.test.TestServiceNotServing': 'NOT_SERVING',
+ 'grpc.test.TestServiceServing': 'SERVING'
};
var healthServer = new grpc.Server();
healthServer.addProtoService(health.service,
@@ -71,15 +67,15 @@ describe('Health Checking', function() {
});
});
it('should say that a disabled service is NOT_SERVING', function(done) {
- healthClient.check({service: 'grpc.test.TestService'},
+ healthClient.check({service: 'grpc.test.TestServiceNotServing'},
function(err, response) {
assert.ifError(err);
assert.strictEqual(response.status, 'NOT_SERVING');
done();
});
});
- it('should say that a service on another host is SERVING', function(done) {
- healthClient.check({host: 'virtual_host', service: 'grpc.test.TestService'},
+ it('should say that an enabled service is SERVING', function(done) {
+ healthClient.check({service: 'grpc.test.TestServiceServing'},
function(err, response) {
assert.ifError(err);
assert.strictEqual(response.status, 'SERVING');
@@ -93,12 +89,4 @@ describe('Health Checking', function() {
done();
});
});
- it('should get NOT_FOUND if the host is not registered', function(done) {
- healthClient.check({host: 'wrong_host', service: 'grpc.test.TestService'},
- function(err, response) {
- assert(err);
- assert.strictEqual(err.code, grpc.status.NOT_FOUND);
- done();
- });
- });
});
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
index 20c9a07ffa..78bac8da29 100644
--- a/src/node/test/server_test.js
+++ b/src/node/test/server_test.js
@@ -70,7 +70,9 @@ describe('server', function() {
var pem_path = path.join(__dirname, '../test/data/server1.pem');
var key_data = fs.readFileSync(key_path);
var pem_data = fs.readFileSync(pem_path);
- var creds = grpc.ServerCredentials.createSsl(null, key_data, pem_data);
+ var creds = grpc.ServerCredentials.createSsl(null,
+ [{private_key: key_data,
+ cert_chain: pem_data}]);
assert.doesNotThrow(function() {
port = server.addHttp2Port('0.0.0.0:0', creds);
});
diff --git a/src/php/README.md b/src/php/README.md
index 1804606e09..f432935fde 100644
--- a/src/php/README.md
+++ b/src/php/README.md
@@ -7,17 +7,17 @@ This directory contains source code for PHP implementation of gRPC layered on sh
Alpha : Ready for early adopters
-## ENVIRONMENT
+## Environment
Prerequisite: PHP 5.5 or later, `phpunit`, `pecl`
-Linux:
+**Linux:**
```sh
$ sudo apt-get install php5 php5-dev phpunit php-pear
```
-OS X:
+**Mac OS X:**
```sh
$ curl https://phar.phpunit.de/phpunit.phar -o phpunit.phar
@@ -28,10 +28,39 @@ $ curl -O http://pear.php.net/go-pear.phar
$ sudo php -d detect_unicode=0 go-pear.phar
```
-## Build from Homebrew
+## Quick Install
-On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. Run the following command to
-install gRPC.
+**Linux (Debian):**
+
+Add [Debian unstable][] to your `sources.list` file. Example:
+
+```sh
+echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
+sudo tee -a /etc/apt/sources.list
+```
+
+Install the gRPC Debian package
+
+```sh
+sudo apt-get update
+sudo apt-get install libgrpc-dev
+```
+
+Install the gRPC PHP extension
+
+```sh
+sudo pecl install grpc-alpha
+```
+
+**Mac OS X:**
+
+Install [homebrew][]. Example:
+
+```sh
+ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
+```
+
+Install the gRPC core library and the PHP extension in one step
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s php
@@ -39,6 +68,7 @@ $ curl -fsSL https://goo.gl/getgrpc | bash -s php
This will download and run the [gRPC install script][] and compile the gRPC PHP extension.
+
## Build from Source
Clone this repository
@@ -71,7 +101,7 @@ $ sudo make install
Install the gRPC PHP extension
```sh
-$ sudo pecl install grpc
+$ sudo pecl install grpc-alpha
```
OR
@@ -140,6 +170,6 @@ $ ./bin/run_gen_code_test.sh
```
[homebrew]:http://brew.sh
-[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[Node]:https://github.com/grpc/grpc/tree/master/src/node/examples
+[Debian unstable]:https://www.debian.org/releases/sid/
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index 4e40dc43ce..252623d0c3 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -216,13 +216,18 @@ PHP_METHOD(Call, __construct) {
char *method;
int method_len;
zval *deadline_obj;
- /* "OsO" == 1 Object, 1 string, 1 Object */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO", &channel_obj,
- grpc_ce_channel, &method, &method_len,
- &deadline_obj, grpc_ce_timeval) == FAILURE) {
+ char *host_override = NULL;
+ int host_override_len = 0;
+ /* "OsO|s" == 1 Object, 1 string, 1 Object, 1 optional string */
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO|s",
+ &channel_obj, grpc_ce_channel,
+ &method, &method_len,
+ &deadline_obj, grpc_ce_timeval,
+ &host_override, &host_override_len)
+ == FAILURE) {
zend_throw_exception(
spl_ce_InvalidArgumentException,
- "Call expects a Channel, a String, and a Timeval",
+ "Call expects a Channel, a String, a Timeval and an optional String",
1 TSRMLS_CC);
return;
}
@@ -241,7 +246,7 @@ PHP_METHOD(Call, __construct) {
deadline_obj TSRMLS_CC);
call->wrapped = grpc_channel_create_call(
channel->wrapped, NULL, GRPC_PROPAGATE_DEFAULTS, completion_queue, method,
- channel->target, deadline->wrapped, NULL);
+ host_override, deadline->wrapped, NULL);
}
/**
@@ -273,7 +278,6 @@ PHP_METHOD(Call, startBatch) {
grpc_byte_buffer *message;
int cancelled;
grpc_call_error error;
- grpc_event event;
zval *result;
char *message_str;
size_t message_len;
@@ -409,14 +413,8 @@ PHP_METHOD(Call, startBatch) {
(long)error TSRMLS_CC);
goto cleanup;
}
- event = grpc_completion_queue_pluck(completion_queue, call->wrapped,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
- if (!event.success) {
- zend_throw_exception(spl_ce_LogicException,
- "The batch failed for some reason",
- 1 TSRMLS_CC);
- goto cleanup;
- }
+ grpc_completion_queue_pluck(completion_queue, call->wrapped,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
for (int i = 0; i < op_num; i++) {
switch(ops[i].op) {
case GRPC_OP_SEND_INITIAL_METADATA:
diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c
index f8ce04d902..7a981675de 100644
--- a/src/php/ext/grpc/channel.c
+++ b/src/php/ext/grpc/channel.c
@@ -64,7 +64,6 @@ void free_wrapped_grpc_channel(void *object TSRMLS_DC) {
if (channel->wrapped != NULL) {
grpc_channel_destroy(channel->wrapped);
}
- efree(channel->target);
efree(channel);
}
@@ -141,9 +140,6 @@ PHP_METHOD(Channel, __construct) {
HashTable *array_hash;
zval **creds_obj = NULL;
wrapped_grpc_credentials *creds = NULL;
- zval **override_obj;
- char *override;
- int override_len;
/* "s|a" == 1 string, 1 optional array */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|a", &target,
&target_length, &args_array) == FAILURE) {
@@ -151,8 +147,6 @@ PHP_METHOD(Channel, __construct) {
"Channel expects a string and an array", 1 TSRMLS_CC);
return;
}
- override = target;
- override_len = target_length;
if (args_array == NULL) {
channel->wrapped = grpc_insecure_channel_create(target, NULL, NULL);
} else {
@@ -169,19 +163,6 @@ PHP_METHOD(Channel, __construct) {
*creds_obj TSRMLS_CC);
zend_hash_del(array_hash, "credentials", 12);
}
- if (zend_hash_find(array_hash, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
- sizeof(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG),
- (void **)&override_obj) == SUCCESS) {
- if (Z_TYPE_PP(override_obj) != IS_STRING) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
- " must be a string",
- 1 TSRMLS_CC);
- return;
- }
- override = Z_STRVAL_PP(override_obj);
- override_len = Z_STRLEN_PP(override_obj);
- }
php_grpc_read_args_array(args_array, &args);
if (creds == NULL) {
channel->wrapped = grpc_insecure_channel_create(target, &args, NULL);
@@ -192,8 +173,6 @@ PHP_METHOD(Channel, __construct) {
}
efree(args.args);
}
- channel->target = ecalloc(override_len + 1, sizeof(char));
- memcpy(channel->target, override, override_len);
}
/**
diff --git a/src/php/ext/grpc/channel.h b/src/php/ext/grpc/channel.h
index c13fa4c6d7..78a16ed0c9 100755
--- a/src/php/ext/grpc/channel.h
+++ b/src/php/ext/grpc/channel.h
@@ -53,7 +53,6 @@ typedef struct wrapped_grpc_channel {
zend_object std;
grpc_channel *wrapped;
- char *target;
} wrapped_grpc_channel;
/* Initializes the Channel class */
diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
index 8b7e67f57c..287621d930 100644
--- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
+++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
@@ -39,6 +39,14 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase {
protected static $client;
protected static $timeout;
+ public function testWaitForNotReady() {
+ $this->assertFalse(self::$client->waitForReady(1));
+ }
+
+ public function testWaitForReady() {
+ $this->assertTrue(self::$client->waitForReady(250000));
+ }
+
public function testSimpleRequest() {
$div_arg = new math\DivArgs();
$div_arg->setDividend(7);
diff --git a/src/php/tests/generated_code/GeneratedCodeTest.php b/src/php/tests/generated_code/GeneratedCodeTest.php
index 1e4742fc80..a1a2ce81db 100755
--- a/src/php/tests/generated_code/GeneratedCodeTest.php
+++ b/src/php/tests/generated_code/GeneratedCodeTest.php
@@ -35,7 +35,7 @@ require 'AbstractGeneratedCodeTest.php';
class GeneratedCodeTest extends AbstractGeneratedCodeTest {
public static function setUpBeforeClass() {
- self::$client = new math\MathClient(new Grpc\BaseStub(
- getenv('GRPC_TEST_HOST'), []));
+ self::$client = new math\MathClient(
+ getenv('GRPC_TEST_HOST'), []);
}
}
diff --git a/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php b/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php
index f8ec1e7da8..68f57d34ad 100644
--- a/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php
+++ b/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php
@@ -35,13 +35,13 @@ require 'AbstractGeneratedCodeTest.php';
class GeneratedCodeWithCallbackTest extends AbstractGeneratedCodeTest {
public static function setUpBeforeClass() {
- self::$client = new math\MathClient(new Grpc\BaseStub(
+ self::$client = new math\MathClient(
getenv('GRPC_TEST_HOST'), ['update_metadata' =>
function($a_hash,
$client = array()) {
$a_copy = $a_hash;
$a_copy['foo'] = ['bar'];
return $a_copy;
- }]));
+ }]);
}
}
diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php
index 44e6242c29..376d306da0 100755
--- a/src/php/tests/interop/interop_client.php
+++ b/src/php/tests/interop/interop_client.php
@@ -271,7 +271,7 @@ function cancelAfterFirstResponse($stub) {
}
function timeoutOnSleepingServer($stub) {
- $call = $stub->FullDuplexCall(array('timeout' => 500000));
+ $call = $stub->FullDuplexCall(array('timeout' => 1000));
$request = new grpc\testing\StreamingOutputCallRequest();
$request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE);
$response_parameters = new grpc\testing\ResponseParameters();
diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php
index f91c006da5..60341b983d 100755
--- a/src/php/tests/unit_tests/SecureEndToEndTest.php
+++ b/src/php/tests/unit_tests/SecureEndToEndTest.php
@@ -40,13 +40,15 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
file_get_contents(dirname(__FILE__) . '/../data/server1.key'),
file_get_contents(dirname(__FILE__) . '/../data/server1.pem'));
$this->server = new Grpc\Server();
- $port = $this->server->addSecureHttp2Port('0.0.0.0:0',
+ $this->port = $this->server->addSecureHttp2Port('0.0.0.0:0',
$server_credentials);
$this->server->start();
+ $this->host_override = 'foo.test.google.fr';
$this->channel = new Grpc\Channel(
- 'localhost:' . $port,
+ 'localhost:' . $this->port,
[
- 'grpc.ssl_target_name_override' => 'foo.test.google.fr',
+ 'grpc.ssl_target_name_override' => $this->host_override,
+ 'grpc.default_authority' => $this->host_override,
'credentials' => $credentials
]);
}
@@ -61,7 +63,8 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$status_text = 'xyz';
$call = new Grpc\Call($this->channel,
'dummy_method',
- $deadline);
+ $deadline,
+ $this->host_override);
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
@@ -112,7 +115,8 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$call = new Grpc\Call($this->channel,
'dummy_method',
- $deadline);
+ $deadline,
+ $this->host_override);
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
diff --git a/src/python/README.md b/src/python/README.md
index 2beb3a913a..de0142db05 100644
--- a/src/python/README.md
+++ b/src/python/README.md
@@ -9,12 +9,36 @@ Alpha : Ready for early adopters
PREREQUISITES
-------------
- Python 2.7, virtualenv, pip
-- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core.
+- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
INSTALLATION
-------------
-On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][].
-Run the following command to install gRPC Python.
+
+**Linux (Debian):**
+
+Add [Debian unstable][] to your `sources.list` file. Example:
+
+```sh
+echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
+sudo tee -a /etc/apt/sources.list
+```
+
+Install the gRPC Debian package
+
+```sh
+sudo apt-get update
+sudo apt-get install libgrpc-dev
+```
+
+Install the gRPC Python module
+
+```sh
+sudo pip install grpcio
+```
+
+**Mac OS X**
+
+Install [homebrew][]. Run the following command to install gRPC Python.
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s python
```
@@ -27,11 +51,6 @@ Please read our online documentation for a [Quick Start][] and a [detailed examp
BUILDING FROM SOURCE
---------------------
- Clone this repository
-- Build the gRPC core from the root of the
- [gRPC Git repository](https://github.com/grpc/grpc)
-```
-$ make shared_c static_c
-```
- Use build_python.sh to build the Python code and install it into a virtual environment
```
@@ -60,7 +79,7 @@ $ ../../tools/distrib/python/submit.py
```
[homebrew]:http://brew.sh
-[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html
[detailed example]:http://www.grpc.io/docs/installation/python.html
+[Debian unstable]:https://www.debian.org/releases/sid/
diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst
index 00bdecf56f..c7b5a3bde4 100644
--- a/src/python/grpcio/README.rst
+++ b/src/python/grpcio/README.rst
@@ -6,7 +6,7 @@ Package for GRPC Python.
Dependencies
------------
-Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_.
+Ensure you have installed the gRPC core. On Mac OS X, install homebrew_.
Run the following command to install gRPC Python.
::
@@ -19,5 +19,4 @@ Otherwise, `install from source`_
.. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source
.. _homebrew: http://brew.sh
-.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation
.. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
diff --git a/src/ruby/README.md b/src/ruby/README.md
index 4b657c0bd4..f8902e34c5 100644
--- a/src/ruby/README.md
+++ b/src/ruby/README.md
@@ -12,12 +12,36 @@ PREREQUISITES
-------------
- Ruby 2.x. The gRPC API uses keyword args.
-- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core.
+- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
INSTALLATION
---------------
-On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][].
-Run the following command to install gRPC Ruby.
+
+**Linux (Debian):**
+
+Add [Debian unstable][] to your `sources.list` file. Example:
+
+```sh
+echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
+sudo tee -a /etc/apt/sources.list
+```
+
+Install the gRPC Debian package
+
+```sh
+sudo apt-get update
+sudo apt-get install libgrpc-dev
+```
+
+Install the gRPC Ruby package
+
+```sh
+gem install grpc
+```
+
+**Mac OS X**
+
+Install [homebrew][]. Run the following command to install gRPC Ruby.
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s ruby
```
@@ -26,12 +50,6 @@ This will download and run the [gRPC install script][], then install the latest
BUILD FROM SOURCE
---------------------
- Clone this repository
-- Build the gRPC C core
-E.g, from the root of the gRPC [Git repository](https://github.com/google/grpc)
-```sh
-$ cd ../..
-$ make && sudo make install
-```
- Install Ruby 2.x. Consider doing this with [RVM](http://rvm.io), it's a nice way of controlling
the exact ruby version that's used.
@@ -77,8 +95,8 @@ Directory structure is the layout for [ruby extensions][]
GRPC.logger.info("Answer: #{resp.inspect}")
```
[homebrew]:http://brew.sh
-[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[ruby extensions]:http://guides.rubygems.org/gems-with-extensions/
[rubydoc]: http://www.rubydoc.info/gems/grpc
[grpc.io]: http://www.grpc.io/docs/installation/ruby.html
+[Debian unstable]:https://www.debian.org/releases/sid/
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 36c6818a7e..6b5beb6f5d 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -82,6 +82,10 @@ static ID id_metadata;
* received by the call and subsequently saved on it. */
static ID id_status;
+/* id_write_flag is name of the attribute used to access the write_flag
+ * saved on the call. */
+static ID id_write_flag;
+
/* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
static VALUE sym_send_message;
static VALUE sym_send_metadata;
@@ -240,6 +244,30 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
return rb_ivar_set(self, id_metadata, metadata);
}
+/*
+ call-seq:
+ write_flag = call.write_flag
+
+ Gets the write_flag value saved the call. */
+static VALUE grpc_rb_call_get_write_flag(VALUE self) {
+ return rb_ivar_get(self, id_write_flag);
+}
+
+/*
+ call-seq:
+ call.write_flag = write_flag
+
+ Saves the write_flag on the call. */
+static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
+ if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
+ rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
+ rb_obj_classname(write_flag));
+ return Qnil;
+ }
+
+ return rb_ivar_set(self, id_write_flag, write_flag);
+}
+
/* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
to fill grpc_metadata_array.
@@ -437,17 +465,19 @@ typedef struct run_batch_stack {
grpc_status_code recv_status;
char *recv_status_details;
size_t recv_status_details_capacity;
+ uint write_flag;
} run_batch_stack;
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
-static void grpc_run_batch_stack_init(run_batch_stack *st) {
+static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
grpc_metadata_array_init(&st->recv_metadata);
grpc_metadata_array_init(&st->recv_trailing_metadata);
st->op_num = 0;
+ st->write_flag = write_flag;
}
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
@@ -477,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
this_op = rb_ary_entry(ops_ary, i);
this_value = rb_hash_aref(ops_hash, this_op);
+ st->ops[st->op_num].flags = 0;
switch (NUM2INT(this_op)) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* N.B. later there is no need to explicitly delete the metadata keys
@@ -490,6 +521,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
case GRPC_OP_SEND_MESSAGE:
st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
RSTRING_PTR(this_value), RSTRING_LEN(this_value));
+ st->ops[st->op_num].flags = st->write_flag;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
@@ -525,7 +557,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
NUM2INT(this_op));
};
st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
- st->ops[st->op_num].flags = 0;
st->ops[st->op_num].reserved = NULL;
st->op_num++;
}
@@ -604,6 +635,8 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_event ev;
grpc_call_error err;
VALUE result = Qnil;
+ VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
+ uint write_flag = 0;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
/* Validate the ops args, adding them to a ruby array */
@@ -611,7 +644,10 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
return Qnil;
}
- grpc_run_batch_stack_init(&st);
+ if (rb_write_flag != Qnil) {
+ write_flag = NUM2UINT(rb_write_flag);
+ }
+ grpc_run_batch_stack_init(&st, write_flag);
grpc_run_batch_stack_fill_ops(&st, ops_hash);
/* call grpc_call_start_batch, then wait for it to complete using
@@ -638,6 +674,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
return result;
}
+static void Init_grpc_write_flags() {
+ /* Constants representing the write flags in grpc.h */
+ VALUE grpc_rb_mWriteFlags =
+ rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
+ rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
+ UINT2NUM(GRPC_WRITE_BUFFER_HINT));
+ rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
+ UINT2NUM(GRPC_WRITE_NO_COMPRESS));
+}
+
static void Init_grpc_error_codes() {
/* Constants representing the error codes of grpc_call_error in grpc.h */
VALUE grpc_rb_mRpcErrors =
@@ -735,10 +781,14 @@ void Init_grpc_call() {
rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
+ rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
+ rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
+ 1);
/* Ids used to support call attributes */
id_metadata = rb_intern("metadata");
id_status = rb_intern("status");
+ id_write_flag = rb_intern("write_flag");
/* Ids used by the c wrapping internals. */
id_cq = rb_intern("__cq");
@@ -766,6 +816,7 @@ void Init_grpc_call() {
Init_grpc_error_codes();
Init_grpc_op_codes();
+ Init_grpc_write_flags();
}
/* Gets the call from the ruby object */
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 17da401c6b..d9cb924735 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -59,7 +59,7 @@ module GRPC
include Core::CallOps
extend Forwardable
attr_reader(:deadline)
- def_delegators :@call, :cancel, :metadata
+ def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=
# client_invoke begins a client invocation.
#
@@ -484,6 +484,7 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status, :start_call, :wait)
+ :metadata, :status, :start_call, :wait, :write_flag,
+ :write_flag=)
end
end
diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb
index 3c5d33ffcd..dd3c45f754 100644
--- a/src/ruby/spec/call_spec.rb
+++ b/src/ruby/spec/call_spec.rb
@@ -31,6 +31,14 @@ require 'grpc'
include GRPC::Core::StatusCodes
+describe GRPC::Core::WriteFlags do
+ it 'should define the known write flag values' do
+ m = GRPC::Core::WriteFlags
+ expect(m.const_get(:BUFFER_HINT)).to_not be_nil
+ expect(m.const_get(:NO_COMPRESS)).to_not be_nil
+ end
+end
+
describe GRPC::Core::RpcErrors do
before(:each) do
@known_types = {
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 26208b714a..fcd7bd082f 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -35,6 +35,7 @@ describe GRPC::ActiveCall do
ActiveCall = GRPC::ActiveCall
Call = GRPC::Core::Call
CallOps = GRPC::Core::CallOps
+ WriteFlags = GRPC::Core::WriteFlags
before(:each) do
@pass_through = proc { |x| x }
@@ -129,6 +130,31 @@ describe GRPC::ActiveCall do
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
+
+ TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS]
+ TEST_WRITE_FLAGS.each do |f|
+ it "successfully makes calls with write_flag set to #{f}" do
+ call = make_test_call
+ ActiveCall.client_invoke(call, @client_queue)
+ marshal = proc { |x| 'marshalled:' + x }
+ client_call = ActiveCall.new(call, @client_queue, marshal,
+ @pass_through, deadline)
+ msg = 'message is a string'
+ client_call.write_flag = f
+ client_call.remote_send(msg)
+
+ # confirm that the message was marshalled
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_call = recvd_rpc.call
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => nil
+ }
+ recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+ server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
+ @pass_through, deadline)
+ expect(server_call.remote_read).to eq('marshalled:' + msg)
+ end
+ end
end
describe '#client_invoke' do
@@ -261,7 +287,7 @@ describe GRPC::ActiveCall do
client_call.writes_done(false)
server_call = expect_server_to_receive(msg)
e = client_call.each_remote_read
- n = 3 # arbitrary value > 1
+ n = 3 # arbitrary value > 1
n.times do
server_call.remote_send(reply)
expect(e.next).to eq(reply)