aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/compress_filter.c2
-rw-r--r--src/cpp/client/channel.cc2
-rw-r--r--src/cpp/server/server.cc37
-rw-r--r--src/csharp/Grpc.Auth/AuthInterceptors.cs84
-rw-r--r--src/csharp/Grpc.Auth/Grpc.Auth.csproj42
-rw-r--r--src/csharp/Grpc.Auth/OAuth2Interceptors.cs115
-rw-r--r--src/csharp/Grpc.Core.Tests/ChannelTest.cs33
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs15
-rw-r--r--src/csharp/Grpc.Core.Tests/CompressionTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj2
-rw-r--r--src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs26
-rw-r--r--src/csharp/Grpc.Core.Tests/MetadataTest.cs120
-rw-r--r--src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs8
-rw-r--r--src/csharp/Grpc.Core.Tests/ServerTest.cs5
-rw-r--r--src/csharp/Grpc.Core.Tests/ShutdownTest.cs77
-rw-r--r--src/csharp/Grpc.Core.Tests/TimeoutsTest.cs8
-rw-r--r--src/csharp/Grpc.Core/AsyncClientStreamingCall.cs18
-rw-r--r--src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs2
-rw-r--r--src/csharp/Grpc.Core/AsyncServerStreamingCall.cs2
-rw-r--r--src/csharp/Grpc.Core/Channel.cs50
-rw-r--r--src/csharp/Grpc.Core/ChannelOptions.cs2
-rw-r--r--src/csharp/Grpc.Core/ClientBase.cs21
-rw-r--r--src/csharp/Grpc.Core/ContextPropagationToken.cs1
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs37
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs8
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs6
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs17
-rw-r--r--src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs16
-rw-r--r--src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/DebugStats.cs14
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs10
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs4
-rw-r--r--src/csharp/Grpc.Core/Logging/ConsoleLogger.cs14
-rw-r--r--src/csharp/Grpc.Core/Metadata.cs112
-rw-r--r--src/csharp/Grpc.Core/Method.cs29
-rw-r--r--src/csharp/Grpc.Core/Server.cs57
-rw-r--r--src/csharp/Grpc.Examples.MathClient/MathClient.cs20
-rw-r--r--src/csharp/Grpc.Examples.MathServer/MathServer.cs1
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs3
-rw-r--r--src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs3
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs52
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs9
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropServer.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs3
-rw-r--r--src/node/README.md32
-rw-r--r--src/node/ext/call.cc15
-rw-r--r--src/node/ext/call.h13
-rw-r--r--src/node/ext/node_grpc.cc11
-rw-r--r--src/node/ext/server.cc2
-rw-r--r--src/node/health_check/health.js10
-rw-r--r--src/node/health_check/health.proto5
-rw-r--r--src/node/index.js5
-rw-r--r--src/node/src/client.js22
-rw-r--r--src/node/src/server.js26
-rw-r--r--src/node/test/health_test.js24
-rw-r--r--src/php/README.md46
-rw-r--r--src/php/ext/grpc/call.c28
-rw-r--r--src/php/ext/grpc/channel.c21
-rwxr-xr-xsrc/php/ext/grpc/channel.h1
-rw-r--r--src/php/tests/generated_code/AbstractGeneratedCodeTest.php8
-rwxr-xr-xsrc/php/tests/generated_code/GeneratedCodeTest.php4
-rw-r--r--src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php4
-rwxr-xr-xsrc/php/tests/interop/interop_client.php2
-rwxr-xr-xsrc/php/tests/unit_tests/SecureEndToEndTest.php14
-rw-r--r--src/python/README.md37
-rw-r--r--src/python/grpcio/README.rst3
-rw-r--r--src/ruby/README.md38
-rw-r--r--src/ruby/ext/grpc/rb_call.c57
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb5
-rw-r--r--src/ruby/lib/grpc/logconfig.rb3
-rw-r--r--src/ruby/spec/call_spec.rb8
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb28
75 files changed, 1093 insertions, 499 deletions
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 20d723bbc1..762a4edc73 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -216,7 +216,7 @@ static void process_send_ops(grpc_call_element *elem,
[calld->compression_algorithm]));
/* convey supported compression algorithms */
- grpc_metadata_batch_add_head(
+ grpc_metadata_batch_add_tail(
&(sop->data.metadata), &calld->accept_encoding_storage,
GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 9695a0f14b..17f31c22cb 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -71,7 +71,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
} else {
const char* host_str = NULL;
if (!context->authority().empty()) {
- host_str = context->authority().c_str();
+ host_str = context->authority_.c_str();
} else if (!host_.empty()) {
host_str = host_.c_str();
}
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 27472f4880..e039c07374 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -90,6 +90,26 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
+ gpr_timespec deadline) {
+ void* tag = nullptr;
+ *ok = false;
+ switch (cq->AsyncNext(&tag, ok, deadline)) {
+ case CompletionQueue::TIMEOUT:
+ *req = nullptr;
+ return true;
+ case CompletionQueue::SHUTDOWN:
+ *req = nullptr;
+ return false;
+ case CompletionQueue::GOT_EVENT:
+ *req = static_cast<SyncRequest*>(tag);
+ GPR_ASSERT((*req)->in_flight_);
+ return true;
+ }
+ gpr_log(GPR_ERROR, "Should never reach here");
+ abort();
+ }
+
void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
@@ -303,12 +323,27 @@ bool Server::Start() {
return true;
}
-void Server::Shutdown() {
+void Server::ShutdownInternal(gpr_timespec deadline) {
grpc::unique_lock<grpc::mutex> lock(mu_);
if (started_ && !shutdown_) {
shutdown_ = true;
grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
cq_.Shutdown();
+ // Spin, eating requests until the completion queue is completely shutdown.
+ // If the deadline expires then cancel anything that's pending and keep
+ // spinning forever until the work is actually drained.
+ // Since nothing else needs to touch state guarded by mu_, holding it
+ // through this loop is fine.
+ SyncRequest* request;
+ bool ok;
+ while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
+ if (request == NULL) { // deadline expired
+ grpc_server_cancel_all_calls(server_);
+ deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ } else if (ok) {
+ SyncRequest::CallData call_data(this, request);
+ }
+ }
// Wait for running callbacks to finish.
while (num_running_cb_ != 0) {
diff --git a/src/csharp/Grpc.Auth/AuthInterceptors.cs b/src/csharp/Grpc.Auth/AuthInterceptors.cs
new file mode 100644
index 0000000000..61338f7f0e
--- /dev/null
+++ b/src/csharp/Grpc.Auth/AuthInterceptors.cs
@@ -0,0 +1,84 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Threading;
+
+using Google.Apis.Auth.OAuth2;
+using Grpc.Core;
+using Grpc.Core.Utils;
+
+namespace Grpc.Auth
+{
+ /// <summary>
+ /// Factory methods to create authorization interceptors.
+ /// </summary>
+ public static class AuthInterceptors
+ {
+ private const string AuthorizationHeader = "Authorization";
+ private const string Schema = "Bearer";
+
+ /// <summary>
+ /// Creates interceptor that will obtain access token from any credential type that implements
+ /// <c>ITokenAccess</c>. (e.g. <c>GoogleCredential</c>).
+ /// </summary>
+ public static HeaderInterceptor FromCredential(ITokenAccess credential)
+ {
+ return new HeaderInterceptor((method, authUri, metadata) =>
+ {
+ // TODO(jtattermusch): Rethink synchronous wait to obtain the result.
+ var accessToken = credential.GetAccessTokenForRequestAsync(authUri, CancellationToken.None)
+ .ConfigureAwait(false).GetAwaiter().GetResult();
+ metadata.Add(CreateBearerTokenHeader(accessToken));
+ });
+ }
+
+ /// <summary>
+ /// Creates OAuth2 interceptor that will use given access token as authorization.
+ /// </summary>
+ /// <param name="accessToken">OAuth2 access token.</param>
+ public static HeaderInterceptor FromAccessToken(string accessToken)
+ {
+ Preconditions.CheckNotNull(accessToken);
+ return new HeaderInterceptor((method, authUri, metadata) =>
+ {
+ metadata.Add(CreateBearerTokenHeader(accessToken));
+ });
+ }
+
+ private static Metadata.Entry CreateBearerTokenHeader(string accessToken)
+ {
+ return new Metadata.Entry(AuthorizationHeader, Schema + " " + accessToken);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Auth/Grpc.Auth.csproj b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
index 930a34b0c3..4fb087d4a3 100644
--- a/src/csharp/Grpc.Auth/Grpc.Auth.csproj
+++ b/src/csharp/Grpc.Auth/Grpc.Auth.csproj
@@ -3,8 +3,6 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProductVersion>10.0.0</ProductVersion>
- <SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{AE21D0EE-9A2C-4C15-AB7F-5224EED5B0EA}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>Grpc.Auth</RootNamespace>
@@ -41,57 +39,47 @@
<AssemblyOriginatorKeyFile>C:\keys\Grpc.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>
<ItemGroup>
- <Reference Include="BouncyCastle.Crypto, Version=1.7.4137.9688, Culture=neutral, PublicKeyToken=a4292a325f69b123, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="System" />
+ <Reference Include="System.Net" />
+ <Reference Include="System.Net.Http" />
+ <Reference Include="System.Net.Http.WebRequest" />
+ <Reference Include="BouncyCastle.Crypto">
<HintPath>..\packages\BouncyCastle.1.7.0\lib\Net40-Client\BouncyCastle.Crypto.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Auth, Version=1.9.3.19379, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="Google.Apis.Auth">
<HintPath>..\packages\Google.Apis.Auth.1.9.3\lib\net40\Google.Apis.Auth.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Auth.PlatformServices, Version=1.9.3.19383, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="Google.Apis.Auth.PlatformServices">
<HintPath>..\packages\Google.Apis.Auth.1.9.3\lib\net40\Google.Apis.Auth.PlatformServices.dll</HintPath>
</Reference>
- <Reference Include="Google.Apis.Core, Version=1.9.3.19379, Culture=neutral, PublicKeyToken=4b01fa6e34db77ab, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="Google.Apis.Core">
<HintPath>..\packages\Google.Apis.Core.1.9.3\lib\portable-net40+sl50+win+wpa81+wp80\Google.Apis.Core.dll</HintPath>
</Reference>
- <Reference Include="Microsoft.Threading.Tasks, Version=1.0.12.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="Microsoft.Threading.Tasks">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.dll</HintPath>
</Reference>
- <Reference Include="Microsoft.Threading.Tasks.Extensions, Version=1.0.12.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="Microsoft.Threading.Tasks.Extensions">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.dll</HintPath>
</Reference>
- <Reference Include="Microsoft.Threading.Tasks.Extensions.Desktop, Version=1.0.168.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="Microsoft.Threading.Tasks.Extensions.Desktop">
<HintPath>..\packages\Microsoft.Bcl.Async.1.0.168\lib\net40\Microsoft.Threading.Tasks.Extensions.Desktop.dll</HintPath>
</Reference>
- <Reference Include="Newtonsoft.Json, Version=7.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="Newtonsoft.Json">
<HintPath>..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll</HintPath>
</Reference>
- <Reference Include="System" />
- <Reference Include="System.Net" />
- <Reference Include="System.Net.Http" />
- <Reference Include="System.Net.Http.Extensions, Version=2.2.29.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="System.Net.Http.Extensions">
<HintPath>..\packages\Microsoft.Net.Http.2.2.29\lib\net45\System.Net.Http.Extensions.dll</HintPath>
</Reference>
- <Reference Include="System.Net.Http.Primitives, Version=4.2.29.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
- <SpecificVersion>False</SpecificVersion>
+ <Reference Include="System.Net.Http.Primitives">
<HintPath>..\packages\Microsoft.Net.Http.2.2.29\lib\net45\System.Net.Http.Primitives.dll</HintPath>
</Reference>
- <Reference Include="System.Net.Http.WebRequest" />
</ItemGroup>
<ItemGroup>
<Compile Include="..\Grpc.Core\Version.cs">
<Link>Version.cs</Link>
</Compile>
<Compile Include="Properties\AssemblyInfo.cs" />
- <Compile Include="OAuth2Interceptors.cs" />
+ <Compile Include="AuthInterceptors.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Auth/OAuth2Interceptors.cs b/src/csharp/Grpc.Auth/OAuth2Interceptors.cs
deleted file mode 100644
index d628a83246..0000000000
--- a/src/csharp/Grpc.Auth/OAuth2Interceptors.cs
+++ /dev/null
@@ -1,115 +0,0 @@
-#region Copyright notice and license
-
-// Copyright 2015, Google Inc.
-// All rights reserved.
-//
-// Redistribution and use in source and binary forms, with or without
-// modification, are permitted provided that the following conditions are
-// met:
-//
-// * Redistributions of source code must retain the above copyright
-// notice, this list of conditions and the following disclaimer.
-// * Redistributions in binary form must reproduce the above
-// copyright notice, this list of conditions and the following disclaimer
-// in the documentation and/or other materials provided with the
-// distribution.
-// * Neither the name of Google Inc. nor the names of its
-// contributors may be used to endorse or promote products derived from
-// this software without specific prior written permission.
-//
-// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-#endregion
-
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.IO;
-using System.Security.Cryptography.X509Certificates;
-using System.Text.RegularExpressions;
-using System.Threading;
-using System.Threading.Tasks;
-
-using Google.Apis.Auth.OAuth2;
-using Google.Apis.Util;
-using Grpc.Core;
-using Grpc.Core.Utils;
-
-namespace Grpc.Auth
-{
- public static class OAuth2Interceptors
- {
- /// <summary>
- /// Creates OAuth2 interceptor that will obtain access token from GoogleCredentials.
- /// </summary>
- public static MetadataInterceptorDelegate FromCredential(GoogleCredential googleCredential)
- {
- var interceptor = new OAuth2Interceptor(googleCredential, SystemClock.Default);
- return new MetadataInterceptorDelegate(interceptor.InterceptHeaders);
- }
-
- /// <summary>
- /// Creates OAuth2 interceptor that will use given OAuth2 token.
- /// </summary>
- /// <param name="oauth2Token"></param>
- /// <returns></returns>
- public static MetadataInterceptorDelegate FromAccessToken(string oauth2Token)
- {
- Preconditions.CheckNotNull(oauth2Token);
- return new MetadataInterceptorDelegate((authUri, metadata) =>
- {
- metadata.Add(OAuth2Interceptor.CreateBearerTokenHeader(oauth2Token));
- });
- }
-
- /// <summary>
- /// Injects OAuth2 authorization header into initial metadata (= request headers).
- /// </summary>
- private class OAuth2Interceptor
- {
- private const string AuthorizationHeader = "Authorization";
- private const string Schema = "Bearer";
-
- private ITokenAccess credential;
- private IClock clock;
-
- public OAuth2Interceptor(ITokenAccess credential, IClock clock)
- {
- this.credential = credential;
- this.clock = clock;
- }
-
- /// <summary>
- /// Gets access token and requests refreshing it if is going to expire soon.
- /// </summary>
- /// <param name="cancellationToken"></param>
- /// <returns></returns>
- public string GetAccessToken(string authUri, CancellationToken cancellationToken)
- {
- // TODO(jtattermusch): Rethink synchronous wait to obtain the result.
- return credential.GetAccessTokenForRequestAsync(authUri, cancellationToken: cancellationToken).GetAwaiter().GetResult();
- }
-
- public void InterceptHeaders(string authUri, Metadata metadata)
- {
- var accessToken = GetAccessToken(authUri, CancellationToken.None);
- metadata.Add(CreateBearerTokenHeader(accessToken));
- }
-
- public static Metadata.Entry CreateBearerTokenHeader(string accessToken)
- {
- return new Metadata.Entry(AuthorizationHeader, Schema + " " + accessToken);
- }
- }
- }
-}
diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
index 2787572924..dfbd92879e 100644
--- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
@@ -41,12 +41,6 @@ namespace Grpc.Core.Tests
{
public class ChannelTest
{
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public void Constructor_RejectsInvalidParams()
{
@@ -56,36 +50,33 @@ namespace Grpc.Core.Tests
[Test]
public void State_IdleAfterCreation()
{
- using (var channel = new Channel("localhost", Credentials.Insecure))
- {
- Assert.AreEqual(ChannelState.Idle, channel.State);
- }
+ var channel = new Channel("localhost", Credentials.Insecure);
+ Assert.AreEqual(ChannelState.Idle, channel.State);
+ channel.ShutdownAsync().Wait();
}
[Test]
public void WaitForStateChangedAsync_InvalidArgument()
{
- using (var channel = new Channel("localhost", Credentials.Insecure))
- {
- Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
- }
+ var channel = new Channel("localhost", Credentials.Insecure);
+ Assert.Throws(typeof(ArgumentException), () => channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
+ channel.ShutdownAsync().Wait();
}
[Test]
public void ResolvedTarget()
{
- using (var channel = new Channel("127.0.0.1", Credentials.Insecure))
- {
- Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1"));
- }
+ var channel = new Channel("127.0.0.1", Credentials.Insecure);
+ Assert.IsTrue(channel.ResolvedTarget.Contains("127.0.0.1"));
+ channel.ShutdownAsync().Wait();
}
[Test]
- public void Dispose_IsIdempotent()
+ public void Shutdown_AllowedOnlyOnce()
{
var channel = new Channel("localhost", Credentials.Insecure);
- channel.Dispose();
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
+ Assert.Throws(typeof(InvalidOperationException), () => channel.ShutdownAsync().GetAwaiter().GetResult());
}
}
}
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index e49fdb5268..68279a2007 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -63,16 +63,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public async Task UnaryCall()
{
@@ -208,13 +202,6 @@ namespace Grpc.Core.Tests
}
[Test]
- public void UnaryCall_DisposedChannel()
- {
- channel.Dispose();
- Assert.Throws(typeof(ObjectDisposedException), () => Calls.BlockingUnaryCall(helper.CreateUnaryCall(), "ABC"));
- }
-
- [Test]
public void UnaryCallPerformance()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
diff --git a/src/csharp/Grpc.Core.Tests/CompressionTest.cs b/src/csharp/Grpc.Core.Tests/CompressionTest.cs
index 9547683f60..378c81851c 100644
--- a/src/csharp/Grpc.Core.Tests/CompressionTest.cs
+++ b/src/csharp/Grpc.Core.Tests/CompressionTest.cs
@@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public void WriteOptions_Unary()
{
diff --git a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
index db5f953b0e..2db3f286f7 100644
--- a/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ContextPropagationTest.cs
@@ -62,16 +62,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public async Task PropagateCancellation()
{
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index d6a8f52570..ad4e94a695 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -64,6 +64,7 @@
<Link>Version.cs</Link>
</Compile>
<Compile Include="ClientBaseTest.cs" />
+ <Compile Include="ShutdownTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="ClientServerTest.cs" />
<Compile Include="ServerTest.cs" />
@@ -82,6 +83,7 @@
<Compile Include="ResponseHeadersTest.cs" />
<Compile Include="CompressionTest.cs" />
<Compile Include="ContextPropagationTest.cs" />
+ <Compile Include="MetadataTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 4ed93c7eca..4fdfab5a99 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -43,34 +43,40 @@ namespace Grpc.Core.Tests
[Test]
public void InitializeAndShutdownGrpcEnvironment()
{
- var env = GrpcEnvironment.GetInstance();
+ var env = GrpcEnvironment.AddRef();
Assert.IsNotNull(env.CompletionQueue);
- GrpcEnvironment.Shutdown();
+ GrpcEnvironment.Release();
}
[Test]
public void SubsequentInvocations()
{
- var env1 = GrpcEnvironment.GetInstance();
- var env2 = GrpcEnvironment.GetInstance();
+ var env1 = GrpcEnvironment.AddRef();
+ var env2 = GrpcEnvironment.AddRef();
Assert.IsTrue(object.ReferenceEquals(env1, env2));
- GrpcEnvironment.Shutdown();
- GrpcEnvironment.Shutdown();
+ GrpcEnvironment.Release();
+ GrpcEnvironment.Release();
}
[Test]
public void InitializeAfterShutdown()
{
- var env1 = GrpcEnvironment.GetInstance();
- GrpcEnvironment.Shutdown();
+ var env1 = GrpcEnvironment.AddRef();
+ GrpcEnvironment.Release();
- var env2 = GrpcEnvironment.GetInstance();
- GrpcEnvironment.Shutdown();
+ var env2 = GrpcEnvironment.AddRef();
+ GrpcEnvironment.Release();
Assert.IsFalse(object.ReferenceEquals(env1, env2));
}
[Test]
+ public void ReleaseWithoutAddRef()
+ {
+ Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
+ }
+
+ [Test]
public void GetCoreVersionString()
{
var coreVersion = GrpcEnvironment.GetCoreVersionString();
diff --git a/src/csharp/Grpc.Core.Tests/MetadataTest.cs b/src/csharp/Grpc.Core.Tests/MetadataTest.cs
new file mode 100644
index 0000000000..c00f945d6a
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/MetadataTest.cs
@@ -0,0 +1,120 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class MetadataTest
+ {
+ [Test]
+ public void AsciiEntry()
+ {
+ var entry = new Metadata.Entry("ABC", "XYZ");
+ Assert.IsFalse(entry.IsBinary);
+ Assert.AreEqual("abc", entry.Key); // key is in lowercase.
+ Assert.AreEqual("XYZ", entry.Value);
+ CollectionAssert.AreEqual(new[] { (byte)'X', (byte)'Y', (byte)'Z' }, entry.ValueBytes);
+
+ Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc-bin", "xyz"));
+
+ Assert.AreEqual("[Entry: key=abc, value=XYZ]", entry.ToString());
+ }
+
+ [Test]
+ public void BinaryEntry()
+ {
+ var bytes = new byte[] { 1, 2, 3 };
+ var entry = new Metadata.Entry("ABC-BIN", bytes);
+ Assert.IsTrue(entry.IsBinary);
+ Assert.AreEqual("abc-bin", entry.Key); // key is in lowercase.
+ Assert.Throws(typeof(InvalidOperationException), () => { var v = entry.Value; });
+ CollectionAssert.AreEqual(bytes, entry.ValueBytes);
+
+ Assert.Throws(typeof(ArgumentException), () => new Metadata.Entry("abc", bytes));
+
+ Assert.AreEqual("[Entry: key=abc-bin, valueBytes=System.Byte[]]", entry.ToString());
+ }
+
+ [Test]
+ public void Entry_ConstructionPreconditions()
+ {
+ Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry(null, "xyz"));
+ Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry("abc", (string)null));
+ Assert.Throws(typeof(ArgumentNullException), () => new Metadata.Entry("abc-bin", (byte[])null));
+ }
+
+ [Test]
+ public void Entry_Immutable()
+ {
+ var origBytes = new byte[] { 1, 2, 3 };
+ var bytes = new byte[] { 1, 2, 3 };
+ var entry = new Metadata.Entry("ABC-BIN", bytes);
+ bytes[0] = 255; // changing the array passed to constructor should have any effect.
+ CollectionAssert.AreEqual(origBytes, entry.ValueBytes);
+
+ entry.ValueBytes[0] = 255;
+ CollectionAssert.AreEqual(origBytes, entry.ValueBytes);
+ }
+
+ [Test]
+ public void Entry_CreateUnsafe_Ascii()
+ {
+ var bytes = new byte[] { (byte)'X', (byte)'y' };
+ var entry = Metadata.Entry.CreateUnsafe("abc", bytes);
+ Assert.IsFalse(entry.IsBinary);
+ Assert.AreEqual("abc", entry.Key);
+ Assert.AreEqual("Xy", entry.Value);
+ CollectionAssert.AreEqual(bytes, entry.ValueBytes);
+ }
+
+ [Test]
+ public void Entry_CreateUnsafe_Binary()
+ {
+ var bytes = new byte[] { 1, 2, 3 };
+ var entry = Metadata.Entry.CreateUnsafe("abc-bin", bytes);
+ Assert.IsTrue(entry.IsBinary);
+ Assert.AreEqual("abc-bin", entry.Key);
+ Assert.Throws(typeof(InvalidOperationException), () => { var v = entry.Value; });
+ CollectionAssert.AreEqual(bytes, entry.ValueBytes);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
index 981b8ea3c8..706006702e 100644
--- a/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ResponseHeadersTest.cs
@@ -69,16 +69,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public void WriteResponseHeaders_NullNotAllowed()
{
diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs
index 485006ebac..e7193c843b 100644
--- a/src/csharp/Grpc.Core.Tests/ServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs
@@ -51,7 +51,6 @@ namespace Grpc.Core.Tests
};
server.Start();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
@@ -67,8 +66,7 @@ namespace Grpc.Core.Tests
Assert.Greater(boundPort.BoundPort, 0);
server.Start();
- server.ShutdownAsync();
- GrpcEnvironment.Shutdown();
+ server.ShutdownAsync().Wait();
}
[Test]
@@ -83,7 +81,6 @@ namespace Grpc.Core.Tests
Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build()));
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
}
}
diff --git a/src/csharp/Grpc.Core.Tests/ShutdownTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs
new file mode 100644
index 0000000000..a2be7ddd5e
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ShutdownTest.cs
@@ -0,0 +1,77 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class ShutdownTest
+ {
+ const string Host = "127.0.0.1";
+
+ MockServiceHelper helper;
+ Server server;
+ Channel channel;
+
+ [SetUp]
+ public void Init()
+ {
+ helper = new MockServiceHelper(Host);
+ server = helper.GetServer();
+ server.Start();
+ channel = helper.GetChannel();
+ }
+
+ [Test]
+ public async Task AbandonedCall()
+ {
+ helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ await requestStream.ToListAsync();
+ });
+
+ var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall(new CallOptions(deadline: DateTime.UtcNow.AddMilliseconds(1))));
+
+ channel.ShutdownAsync().Wait();
+ server.ShutdownAsync().Wait();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
index d875d601b9..41f661f62d 100644
--- a/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
+++ b/src/csharp/Grpc.Core.Tests/TimeoutsTest.cs
@@ -65,16 +65,10 @@ namespace Grpc.Core.Tests
[TearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
}
- [TestFixtureTearDown]
- public void CleanupClass()
- {
- GrpcEnvironment.Shutdown();
- }
-
[Test]
public void InfiniteDeadline()
{
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index bf020cd627..fb9b562c77 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -89,6 +89,24 @@ namespace Grpc.Core
}
/// <summary>
+ /// Gets the call status if the call has already finished.
+ /// Throws InvalidOperationException otherwise.
+ /// </summary>
+ public Status GetStatus()
+ {
+ return getStatusFunc();
+ }
+
+ /// <summary>
+ /// Gets the call trailing metadata if the call has already finished.
+ /// Throws InvalidOperationException otherwise.
+ /// </summary>
+ public Metadata GetTrailers()
+ {
+ return getTrailersFunc();
+ }
+
+ /// <summary>
/// Provides means to cleanup after the call.
/// If the call has already finished normally (request stream has been completed and call result has been received), doesn't do anything.
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index 0979de606f..183c84216a 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -32,8 +32,6 @@
#endregion
using System;
-using System.Runtime.CompilerServices;
-using System.Threading.Tasks;
namespace Grpc.Core
{
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index 380efcdb0e..ab2049f269 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -32,8 +32,6 @@
#endregion
using System;
-using System.Runtime.CompilerServices;
-using System.Threading.Tasks;
namespace Grpc.Core
{
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 64c6adf2bf..2f8519dfa3 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -45,14 +45,19 @@ namespace Grpc.Core
/// <summary>
/// gRPC Channel
/// </summary>
- public class Channel : IDisposable
+ public class Channel
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Channel>();
+ readonly object myLock = new object();
+ readonly AtomicCounter activeCallCounter = new AtomicCounter();
+
readonly string target;
readonly GrpcEnvironment environment;
readonly ChannelSafeHandle handle;
readonly List<ChannelOption> options;
+
+ bool shutdownRequested;
bool disposed;
/// <summary>
@@ -65,7 +70,7 @@ namespace Grpc.Core
public Channel(string target, Credentials credentials, IEnumerable<ChannelOption> options = null)
{
this.target = Preconditions.CheckNotNull(target, "target");
- this.environment = GrpcEnvironment.GetInstance();
+ this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
EnsureUserAgentChannelOption(this.options);
@@ -172,12 +177,26 @@ namespace Grpc.Core
}
/// <summary>
- /// Destroys the underlying channel.
+ /// Waits until there are no more active calls for this channel and then cleans up
+ /// resources used by this channel.
/// </summary>
- public void Dispose()
+ public async Task ShutdownAsync()
{
- Dispose(true);
- GC.SuppressFinalize(this);
+ lock (myLock)
+ {
+ Preconditions.CheckState(!shutdownRequested);
+ shutdownRequested = true;
+ }
+
+ var activeCallCount = activeCallCounter.Count;
+ if (activeCallCount > 0)
+ {
+ Logger.Warning("Channel shutdown was called but there are still {0} active calls for that channel.", activeCallCount);
+ }
+
+ handle.Dispose();
+
+ await Task.Run(() => GrpcEnvironment.Release());
}
internal ChannelSafeHandle Handle
@@ -196,13 +215,20 @@ namespace Grpc.Core
}
}
- protected virtual void Dispose(bool disposing)
+ internal void AddCallReference(object call)
{
- if (disposing && handle != null && !disposed)
- {
- disposed = true;
- handle.Dispose();
- }
+ activeCallCounter.Increment();
+
+ bool success = false;
+ handle.DangerousAddRef(ref success);
+ Preconditions.CheckState(success);
+ }
+
+ internal void RemoveCallReference(object call)
+ {
+ handle.DangerousRelease();
+
+ activeCallCounter.Decrement();
}
private static void EnsureUserAgentChannelOption(List<ChannelOption> options)
diff --git a/src/csharp/Grpc.Core/ChannelOptions.cs b/src/csharp/Grpc.Core/ChannelOptions.cs
index 0cb2953f2c..ad54b46ad5 100644
--- a/src/csharp/Grpc.Core/ChannelOptions.cs
+++ b/src/csharp/Grpc.Core/ChannelOptions.cs
@@ -71,7 +71,7 @@ namespace Grpc.Core
/// Creates a channel option with an integer value.
/// </summary>
/// <param name="name">Name.</param>
- /// <param name="stringValue">String value.</param>
+ /// <param name="intValue">Integer value.</param>
public ChannelOption(string name, int intValue)
{
this.type = OptionType.Integer;
diff --git a/src/csharp/Grpc.Core/ClientBase.cs b/src/csharp/Grpc.Core/ClientBase.cs
index f240d777b9..903449439b 100644
--- a/src/csharp/Grpc.Core/ClientBase.cs
+++ b/src/csharp/Grpc.Core/ClientBase.cs
@@ -32,15 +32,15 @@
#endregion
using System;
-using System.Collections.Generic;
using System.Text.RegularExpressions;
-
-using Grpc.Core.Internal;
-using Grpc.Core.Utils;
+using System.Threading.Tasks;
namespace Grpc.Core
{
- public delegate void MetadataInterceptorDelegate(string authUri, Metadata metadata);
+ /// <summary>
+ /// Interceptor for call headers.
+ /// </summary>
+ public delegate void HeaderInterceptor(IMethod method, string authUri, Metadata metadata);
/// <summary>
/// Base class for client-side stubs.
@@ -60,10 +60,10 @@ namespace Grpc.Core
}
/// <summary>
- /// Can be used to register a custom header (initial metadata) interceptor.
- /// The delegate each time before a new call on this client is started.
+ /// Can be used to register a custom header (request metadata) interceptor.
+ /// The interceptor is invoked each time a new call on this client is started.
/// </summary>
- public MetadataInterceptorDelegate HeaderInterceptor
+ public HeaderInterceptor HeaderInterceptor
{
get;
set;
@@ -107,7 +107,7 @@ namespace Grpc.Core
options = options.WithHeaders(new Metadata());
}
var authUri = authUriBase != null ? authUriBase + method.ServiceName : null;
- interceptor(authUri, options.Headers);
+ interceptor(method, authUri, options.Headers);
}
return new CallInvocationDetails<TRequest, TResponse>(channel, method, Host, options);
}
@@ -119,7 +119,8 @@ namespace Grpc.Core
internal static string GetAuthUriBase(string target)
{
var match = ChannelTargetPattern.Match(target);
- if (!match.Success) {
+ if (!match.Success)
+ {
return null;
}
return "https://" + match.Groups[2].Value + "/";
diff --git a/src/csharp/Grpc.Core/ContextPropagationToken.cs b/src/csharp/Grpc.Core/ContextPropagationToken.cs
index 2e4bfc9e47..a5bf1b5a70 100644
--- a/src/csharp/Grpc.Core/ContextPropagationToken.cs
+++ b/src/csharp/Grpc.Core/ContextPropagationToken.cs
@@ -132,7 +132,6 @@ namespace Grpc.Core
bool propagateDeadline;
bool propagateCancellation;
-
/// <summary>
/// Creates new context propagation options.
/// </summary>
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 30d8c80235..0a44eead74 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -58,6 +58,7 @@ namespace Grpc.Core
static object staticLock = new object();
static GrpcEnvironment instance;
+ static int refCount;
static ILogger logger = new ConsoleLogger();
@@ -67,13 +68,14 @@ namespace Grpc.Core
bool isClosed;
/// <summary>
- /// Returns an instance of initialized gRPC environment.
- /// Subsequent invocations return the same instance unless Shutdown has been called first.
+ /// Returns a reference-counted instance of initialized gRPC environment.
+ /// Subsequent invocations return the same instance unless reference count has dropped to zero previously.
/// </summary>
- internal static GrpcEnvironment GetInstance()
+ internal static GrpcEnvironment AddRef()
{
lock (staticLock)
{
+ refCount++;
if (instance == null)
{
instance = new GrpcEnvironment();
@@ -83,14 +85,16 @@ namespace Grpc.Core
}
/// <summary>
- /// Shuts down the gRPC environment if it was initialized before.
- /// Blocks until the environment has been fully shutdown.
+ /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
+ /// (and blocks until the environment has been fully shutdown).
/// </summary>
- public static void Shutdown()
+ internal static void Release()
{
lock (staticLock)
{
- if (instance != null)
+ Preconditions.CheckState(refCount > 0);
+ refCount--;
+ if (refCount == 0)
{
instance.Close();
instance = null;
@@ -125,12 +129,10 @@ namespace Grpc.Core
private GrpcEnvironment()
{
NativeLogRedirector.Redirect();
- grpcsharp_init();
+ GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
threadPool.Start();
- // TODO: use proper logging here
- Logger.Info("gRPC initialized.");
}
/// <summary>
@@ -175,6 +177,17 @@ namespace Grpc.Core
return Marshal.PtrToStringAnsi(ptr);
}
+
+ internal static void GrpcNativeInit()
+ {
+ grpcsharp_init();
+ }
+
+ internal static void GrpcNativeShutdown()
+ {
+ grpcsharp_shutdown();
+ }
+
/// <summary>
/// Shuts down this environment.
/// </summary>
@@ -185,12 +198,10 @@ namespace Grpc.Core
throw new InvalidOperationException("Close has already been called");
}
threadPool.Stop();
- grpcsharp_shutdown();
+ GrpcNativeShutdown();
isClosed = true;
debugStats.CheckOK();
-
- Logger.Info("gRPC shutdown.");
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 2c3e3d75ea..bb9ba5b8dd 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -311,9 +311,9 @@ namespace Grpc.Core.Internal
}
}
- protected override void OnReleaseResources()
+ protected override void OnAfterReleaseResources()
{
- details.Channel.Environment.DebugStats.ActiveClientCalls.Decrement();
+ details.Channel.RemoveCallReference(this);
}
private void Initialize(CompletionQueueSafeHandle cq)
@@ -323,7 +323,9 @@ namespace Grpc.Core.Internal
var call = details.Channel.Handle.CreateCall(details.Channel.Environment.CompletionRegistry,
parentCall, ContextPropagationToken.DefaultMask, cq,
details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value));
- details.Channel.Environment.DebugStats.ActiveClientCalls.Increment();
+
+ details.Channel.AddCallReference(this);
+
InitializeInternal(call);
RegisterCancellationCallback();
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 6ca4bbdafc..1808294f43 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -189,15 +189,15 @@ namespace Grpc.Core.Internal
private void ReleaseResources()
{
- OnReleaseResources();
if (call != null)
{
call.Dispose();
}
disposed = true;
+ OnAfterReleaseResources();
}
- protected virtual void OnReleaseResources()
+ protected virtual void OnAfterReleaseResources()
{
}
@@ -212,7 +212,7 @@ namespace Grpc.Core.Internal
Preconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
- protected void CheckReadingAllowed()
+ protected virtual void CheckReadingAllowed()
{
Preconditions.CheckState(started);
Preconditions.CheckState(!disposed);
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index 3710a65d6b..6278c0191e 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -50,16 +50,19 @@ namespace Grpc.Core.Internal
readonly TaskCompletionSource<object> finishedServersideTcs = new TaskCompletionSource<object>();
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
readonly GrpcEnvironment environment;
+ readonly Server server;
- public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment) : base(serializer, deserializer)
+ public AsyncCallServer(Func<TResponse, byte[]> serializer, Func<byte[], TRequest> deserializer, GrpcEnvironment environment, Server server) : base(serializer, deserializer)
{
this.environment = Preconditions.CheckNotNull(environment);
+ this.server = Preconditions.CheckNotNull(server);
}
public void Initialize(CallSafeHandle call)
{
call.SetCompletionRegistry(environment.CompletionRegistry);
- environment.DebugStats.ActiveServerCalls.Increment();
+
+ server.AddCallReference(this);
InitializeInternal(call);
}
@@ -168,9 +171,15 @@ namespace Grpc.Core.Internal
}
}
- protected override void OnReleaseResources()
+ protected override void CheckReadingAllowed()
+ {
+ base.CheckReadingAllowed();
+ Preconditions.CheckArgument(!cancelRequested);
+ }
+
+ protected override void OnAfterReleaseResources()
{
- environment.DebugStats.ActiveServerCalls.Decrement();
+ server.RemoveCallReference(this);
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 6a2add54db..3a96414bea 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -134,7 +134,7 @@ namespace Grpc.Core.Internal
}
// Gets data of server_rpc_new completion.
- public ServerRpcNew GetServerRpcNew()
+ public ServerRpcNew GetServerRpcNew(Server server)
{
var call = grpcsharp_batch_context_server_rpc_new_call(this);
@@ -145,7 +145,7 @@ namespace Grpc.Core.Internal
IntPtr metadataArrayPtr = grpcsharp_batch_context_server_rpc_new_request_metadata(this);
var metadata = MetadataArraySafeHandle.ReadMetadataFromPtrUnsafe(metadataArrayPtr);
- return new ServerRpcNew(call, method, host, deadline, metadata);
+ return new ServerRpcNew(server, call, method, host, deadline, metadata);
}
// Gets data of receive_close_on_server completion.
@@ -198,14 +198,16 @@ namespace Grpc.Core.Internal
/// </summary>
internal struct ServerRpcNew
{
+ readonly Server server;
readonly CallSafeHandle call;
readonly string method;
readonly string host;
readonly Timespec deadline;
readonly Metadata requestMetadata;
- public ServerRpcNew(CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
+ public ServerRpcNew(Server server, CallSafeHandle call, string method, string host, Timespec deadline, Metadata requestMetadata)
{
+ this.server = server;
this.call = call;
this.method = method;
this.host = host;
@@ -213,6 +215,14 @@ namespace Grpc.Core.Internal
this.requestMetadata = requestMetadata;
}
+ public Server Server
+ {
+ get
+ {
+ return this.server;
+ }
+ }
+
public CallSafeHandle Call
{
get
diff --git a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
index 7f03bf4ea5..8cef566c14 100644
--- a/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ChannelSafeHandle.cs
@@ -68,11 +68,17 @@ namespace Grpc.Core.Internal
public static ChannelSafeHandle CreateInsecure(string target, ChannelArgsSafeHandle channelArgs)
{
+ // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
+ // Doing so would make object finalizer crash if we end up abandoning the handle.
+ GrpcEnvironment.GrpcNativeInit();
return grpcsharp_insecure_channel_create(target, channelArgs);
}
public static ChannelSafeHandle CreateSecure(CredentialsSafeHandle credentials, string target, ChannelArgsSafeHandle channelArgs)
{
+ // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
+ // Doing so would make object finalizer crash if we end up abandoning the handle.
+ GrpcEnvironment.GrpcNativeInit();
return grpcsharp_secure_channel_create(credentials, target, channelArgs);
}
@@ -107,6 +113,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle()
{
grpcsharp_channel_destroy(handle);
+ GrpcEnvironment.GrpcNativeShutdown();
return true;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/DebugStats.cs b/src/csharp/Grpc.Core/Internal/DebugStats.cs
index 8793450ff3..1bea1adf9e 100644
--- a/src/csharp/Grpc.Core/Internal/DebugStats.cs
+++ b/src/csharp/Grpc.Core/Internal/DebugStats.cs
@@ -38,10 +38,6 @@ namespace Grpc.Core.Internal
{
internal class DebugStats
{
- public readonly AtomicCounter ActiveClientCalls = new AtomicCounter();
-
- public readonly AtomicCounter ActiveServerCalls = new AtomicCounter();
-
public readonly AtomicCounter PendingBatchCompletions = new AtomicCounter();
/// <summary>
@@ -49,16 +45,6 @@ namespace Grpc.Core.Internal
/// </summary>
public void CheckOK()
{
- var remainingClientCalls = ActiveClientCalls.Count;
- if (remainingClientCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} client calls that weren't disposed properly.", remainingClientCalls));
- }
- var remainingServerCalls = ActiveServerCalls.Count;
- if (remainingServerCalls != 0)
- {
- DebugWarning(string.Format("Detected {0} server calls that weren't disposed properly.", remainingServerCalls));
- }
var pendingBatchCompletions = PendingBatchCompletions.Count;
if (pendingBatchCompletions != 0)
{
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index cb4c7c821e..4b7124ee74 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -83,8 +83,6 @@ namespace Grpc.Core.Internal
lock (myLock)
{
cq.Shutdown();
-
- Logger.Info("Waiting for GRPC threads to finish.");
foreach (var thread in threads)
{
thread.Join();
@@ -136,7 +134,6 @@ namespace Grpc.Core.Internal
}
}
while (ev.type != GRPCCompletionType.Shutdown);
- Logger.Info("Completion queue has shutdown successfully, thread {0} exiting.", Thread.CurrentThread.Name);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
index 427c16fac6..83994f6762 100644
--- a/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/MetadataArraySafeHandle.cs
@@ -70,7 +70,8 @@ namespace Grpc.Core.Internal
var metadataArray = grpcsharp_metadata_array_create(new UIntPtr((ulong)metadata.Count));
for (int i = 0; i < metadata.Count; i++)
{
- grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, metadata[i].ValueBytes, new UIntPtr((ulong)metadata[i].ValueBytes.Length));
+ var valueBytes = metadata[i].GetSerializedValueUnsafe();
+ grpcsharp_metadata_array_add(metadataArray, metadata[i].Key, valueBytes, new UIntPtr((ulong)valueBytes.Length));
}
return metadataArray;
}
@@ -94,7 +95,7 @@ namespace Grpc.Core.Internal
string key = Marshal.PtrToStringAnsi(grpcsharp_metadata_array_get_key(metadataArray, index));
var bytes = new byte[grpcsharp_metadata_array_get_value_length(metadataArray, index).ToUInt64()];
Marshal.Copy(grpcsharp_metadata_array_get_value(metadataArray, index), bytes, 0, bytes.Length);
- metadata.Add(new Metadata.Entry(key, bytes));
+ metadata.Add(Metadata.Entry.CreateUnsafe(key, bytes));
}
return metadata;
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 688f9f6fec..59f4c5727c 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -67,7 +67,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -123,7 +123,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -179,7 +179,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -239,7 +239,7 @@ namespace Grpc.Core.Internal
var asyncCall = new AsyncCallServer<TRequest, TResponse>(
method.ResponseMarshaller.Serializer,
method.RequestMarshaller.Deserializer,
- environment);
+ environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
@@ -278,7 +278,7 @@ namespace Grpc.Core.Internal
{
// We don't care about the payload type here.
var asyncCall = new AsyncCallServer<byte[], byte[]>(
- (payload) => payload, (payload) => payload, environment);
+ (payload) => payload, (payload) => payload, environment, newRpc.Server);
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index f9b44b1acf..5ee7ac14e8 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -74,6 +74,9 @@ namespace Grpc.Core.Internal
public static ServerSafeHandle NewServer(CompletionQueueSafeHandle cq, ChannelArgsSafeHandle args)
{
+ // Increment reference count for the native gRPC environment to make sure we don't do grpc_shutdown() before destroying the server handle.
+ // Doing so would make object finalizer crash if we end up abandoning the handle.
+ GrpcEnvironment.GrpcNativeInit();
return grpcsharp_server_create(cq, args);
}
@@ -109,6 +112,7 @@ namespace Grpc.Core.Internal
protected override bool ReleaseHandle()
{
grpcsharp_server_destroy(handle);
+ GrpcEnvironment.GrpcNativeShutdown();
return true;
}
diff --git a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
index 382481d871..35561d25d8 100644
--- a/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
+++ b/src/csharp/Grpc.Core/Logging/ConsoleLogger.cs
@@ -51,7 +51,19 @@ namespace Grpc.Core.Logging
private ConsoleLogger(Type forType)
{
this.forType = forType;
- this.forTypeString = forType != null ? forType.FullName + " " : "";
+ if (forType != null)
+ {
+ var namespaceStr = forType.Namespace ?? "";
+ if (namespaceStr.Length > 0)
+ {
+ namespaceStr += ".";
+ }
+ this.forTypeString = namespaceStr + forType.Name + " ";
+ }
+ else
+ {
+ this.forTypeString = "";
+ }
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/Metadata.cs b/src/csharp/Grpc.Core/Metadata.cs
index 9db2abf46e..a589b50caa 100644
--- a/src/csharp/Grpc.Core/Metadata.cs
+++ b/src/csharp/Grpc.Core/Metadata.cs
@@ -46,6 +46,11 @@ namespace Grpc.Core
public sealed class Metadata : IList<Metadata.Entry>
{
/// <summary>
+ /// All binary headers should have this suffix.
+ /// </summary>
+ public const string BinaryHeaderSuffix = "-bin";
+
+ /// <summary>
/// An read-only instance of metadata containing no entries.
/// </summary>
public static readonly Metadata Empty = new Metadata().Freeze();
@@ -181,23 +186,49 @@ namespace Grpc.Core
private static readonly Encoding Encoding = Encoding.ASCII;
readonly string key;
- string value;
- byte[] valueBytes;
+ readonly string value;
+ readonly byte[] valueBytes;
+
+ private Entry(string key, string value, byte[] valueBytes)
+ {
+ this.key = key;
+ this.value = value;
+ this.valueBytes = valueBytes;
+ }
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct with a binary value.
+ /// </summary>
+ /// <param name="key">Metadata key, needs to have suffix indicating a binary valued metadata entry.</param>
+ /// <param name="valueBytes">Value bytes.</param>
public Entry(string key, byte[] valueBytes)
{
- this.key = Preconditions.CheckNotNull(key, "key");
+ this.key = NormalizeKey(key);
+ Preconditions.CheckArgument(this.key.EndsWith(BinaryHeaderSuffix),
+ "Key for binary valued metadata entry needs to have suffix indicating binary value.");
this.value = null;
- this.valueBytes = Preconditions.CheckNotNull(valueBytes, "valueBytes");
+ Preconditions.CheckNotNull(valueBytes, "valueBytes");
+ this.valueBytes = new byte[valueBytes.Length];
+ Buffer.BlockCopy(valueBytes, 0, this.valueBytes, 0, valueBytes.Length); // defensive copy to guarantee immutability
}
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Grpc.Core.Metadata+Entry"/> struct holding an ASCII value.
+ /// </summary>
+ /// <param name="key">Metadata key, must not use suffix indicating a binary valued metadata entry.</param>
+ /// <param name="value">Value string. Only ASCII characters are allowed.</param>
public Entry(string key, string value)
{
- this.key = Preconditions.CheckNotNull(key, "key");
+ this.key = NormalizeKey(key);
+ Preconditions.CheckArgument(!this.key.EndsWith(BinaryHeaderSuffix),
+ "Key for ASCII valued metadata entry cannot have suffix indicating binary value.");
this.value = Preconditions.CheckNotNull(value, "value");
this.valueBytes = null;
}
+ /// <summary>
+ /// Gets the metadata entry key.
+ /// </summary>
public string Key
{
get
@@ -206,33 +237,86 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Gets the binary value of this metadata entry.
+ /// </summary>
public byte[] ValueBytes
{
get
{
if (valueBytes == null)
{
- valueBytes = Encoding.GetBytes(value);
+ return Encoding.GetBytes(value);
}
- return valueBytes;
+
+ // defensive copy to guarantee immutability
+ var bytes = new byte[valueBytes.Length];
+ Buffer.BlockCopy(valueBytes, 0, bytes, 0, valueBytes.Length);
+ return bytes;
}
}
+ /// <summary>
+ /// Gets the string value of this metadata entry.
+ /// </summary>
public string Value
{
get
{
- if (value == null)
- {
- value = Encoding.GetString(valueBytes);
- }
- return value;
+ Preconditions.CheckState(!IsBinary, "Cannot access string value of a binary metadata entry");
+ return value ?? Encoding.GetString(valueBytes);
}
}
-
+
+ /// <summary>
+ /// Returns <c>true</c> if this entry is a binary-value entry.
+ /// </summary>
+ public bool IsBinary
+ {
+ get
+ {
+ return value == null;
+ }
+ }
+
+ /// <summary>
+ /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Metadata+Entry"/>.
+ /// </summary>
public override string ToString()
{
- return string.Format("[Entry: key={0}, value={1}]", Key, Value);
+ if (IsBinary)
+ {
+ return string.Format("[Entry: key={0}, valueBytes={1}]", key, valueBytes);
+ }
+
+ return string.Format("[Entry: key={0}, value={1}]", key, value);
+ }
+
+ /// <summary>
+ /// Gets the serialized value for this entry. For binary metadata entries, this leaks
+ /// the internal <c>valueBytes</c> byte array and caller must not change contents of it.
+ /// </summary>
+ internal byte[] GetSerializedValueUnsafe()
+ {
+ return valueBytes ?? Encoding.GetBytes(value);
+ }
+
+ /// <summary>
+ /// Creates a binary value or ascii value metadata entry from data received from the native layer.
+ /// We trust C core to give us well-formed data, so we don't perform any checks or defensive copying.
+ /// </summary>
+ internal static Entry CreateUnsafe(string key, byte[] valueBytes)
+ {
+ if (key.EndsWith(BinaryHeaderSuffix))
+ {
+ return new Entry(key, null, valueBytes);
+ }
+ return new Entry(key, Encoding.GetString(valueBytes), null);
+ }
+
+ private static string NormalizeKey(string key)
+ {
+ return Preconditions.CheckNotNull(key, "key").ToLower();
}
}
}
diff --git a/src/csharp/Grpc.Core/Method.cs b/src/csharp/Grpc.Core/Method.cs
index 4c208b4a26..4c53285893 100644
--- a/src/csharp/Grpc.Core/Method.cs
+++ b/src/csharp/Grpc.Core/Method.cs
@@ -55,9 +55,36 @@ namespace Grpc.Core
}
/// <summary>
+ /// A non-generic representation of a remote method.
+ /// </summary>
+ public interface IMethod
+ {
+ /// <summary>
+ /// Gets the type of the method.
+ /// </summary>
+ MethodType Type { get; }
+
+ /// <summary>
+ /// Gets the name of the service to which this method belongs.
+ /// </summary>
+ string ServiceName { get; }
+
+ /// <summary>
+ /// Gets the unqualified name of the method.
+ /// </summary>
+ string Name { get; }
+
+ /// <summary>
+ /// Gets the fully qualified name of the method. On the server side, methods are dispatched
+ /// based on this name.
+ /// </summary>
+ string FullName { get; }
+ }
+
+ /// <summary>
/// A description of a remote method.
/// </summary>
- public class Method<TRequest, TResponse>
+ public class Method<TRequest, TResponse> : IMethod
{
readonly MethodType type;
readonly string serviceName;
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index c76f126026..28f1686e20 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -50,6 +50,8 @@ namespace Grpc.Core
{
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
+ readonly AtomicCounter activeCallCounter = new AtomicCounter();
+
readonly ServiceDefinitionCollection serviceDefinitions;
readonly ServerPortCollection ports;
readonly GrpcEnvironment environment;
@@ -73,7 +75,7 @@ namespace Grpc.Core
{
this.serviceDefinitions = new ServiceDefinitionCollection(this);
this.ports = new ServerPortCollection(this);
- this.environment = GrpcEnvironment.GetInstance();
+ this.environment = GrpcEnvironment.AddRef();
this.options = options != null ? new List<ChannelOption>(options) : new List<ChannelOption>();
using (var channelArgs = ChannelOptions.CreateChannelArgs(this.options))
{
@@ -106,6 +108,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// To allow awaiting termination of the server.
+ /// </summary>
+ public Task ShutdownTask
+ {
+ get
+ {
+ return shutdownTcs.Task;
+ }
+ }
+
+ /// <summary>
/// Starts the server.
/// </summary>
public void Start()
@@ -136,18 +149,9 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment);
await shutdownTcs.Task;
- handle.Dispose();
- }
+ DisposeHandle();
- /// <summary>
- /// To allow awaiting termination of the server.
- /// </summary>
- public Task ShutdownTask
- {
- get
- {
- return shutdownTcs.Task;
- }
+ await Task.Run(() => GrpcEnvironment.Release());
}
/// <summary>
@@ -166,7 +170,22 @@ namespace Grpc.Core
handle.ShutdownAndNotify(HandleServerShutdown, environment);
handle.CancelAllCalls();
await shutdownTcs.Task;
- handle.Dispose();
+ DisposeHandle();
+ }
+
+ internal void AddCallReference(object call)
+ {
+ activeCallCounter.Increment();
+
+ bool success = false;
+ handle.DangerousAddRef(ref success);
+ Preconditions.CheckState(success);
+ }
+
+ internal void RemoveCallReference(object call)
+ {
+ handle.DangerousRelease();
+ activeCallCounter.Decrement();
}
/// <summary>
@@ -227,6 +246,16 @@ namespace Grpc.Core
}
}
+ private void DisposeHandle()
+ {
+ var activeCallCount = activeCallCounter.Count;
+ if (activeCallCount > 0)
+ {
+ Logger.Warning("Server shutdown has finished but there are still {0} active calls for that server.", activeCallCount);
+ }
+ handle.Dispose();
+ }
+
/// <summary>
/// Selects corresponding handler for given call and handles the call.
/// </summary>
@@ -254,7 +283,7 @@ namespace Grpc.Core
{
if (success)
{
- ServerRpcNew newRpc = ctx.GetServerRpcNew();
+ ServerRpcNew newRpc = ctx.GetServerRpcNew(this);
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
diff --git a/src/csharp/Grpc.Examples.MathClient/MathClient.cs b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
index f9839d99f1..abd95cb905 100644
--- a/src/csharp/Grpc.Examples.MathClient/MathClient.cs
+++ b/src/csharp/Grpc.Examples.MathClient/MathClient.cs
@@ -39,23 +39,21 @@ namespace math
{
public static void Main(string[] args)
{
- using (Channel channel = new Channel("127.0.0.1", 23456, Credentials.Insecure))
- {
- Math.IMathClient client = new Math.MathClient(channel);
- MathExamples.DivExample(client);
+ var channel = new Channel("127.0.0.1", 23456, Credentials.Insecure);
+ Math.IMathClient client = new Math.MathClient(channel);
+ MathExamples.DivExample(client);
- MathExamples.DivAsyncExample(client).Wait();
+ MathExamples.DivAsyncExample(client).Wait();
- MathExamples.FibExample(client).Wait();
+ MathExamples.FibExample(client).Wait();
- MathExamples.SumExample(client).Wait();
+ MathExamples.SumExample(client).Wait();
- MathExamples.DivManyExample(client).Wait();
+ MathExamples.DivManyExample(client).Wait();
- MathExamples.DependendRequestsExample(client).Wait();
- }
+ MathExamples.DependendRequestsExample(client).Wait();
- GrpcEnvironment.Shutdown();
+ channel.ShutdownAsync().Wait();
}
}
}
diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
index 5f7e717b0c..26bef646ec 100644
--- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs
+++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
@@ -56,7 +56,6 @@ namespace math
Console.ReadKey();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
}
}
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index fdef950f09..36c1c947bd 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -68,9 +68,8 @@ namespace math.Tests
[TestFixtureTearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
diff --git a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
index 024377e216..80c35fb197 100644
--- a/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
+++ b/src/csharp/Grpc.HealthCheck.Tests/HealthClientServerTest.cs
@@ -71,10 +71,9 @@ namespace Grpc.HealthCheck.Tests
[TestFixtureTearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 385ca92086..24c22273fb 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -37,13 +37,15 @@ using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
+using Google.Apis.Auth.OAuth2;
using Google.ProtocolBuffers;
+
using grpc.testing;
using Grpc.Auth;
using Grpc.Core;
using Grpc.Core.Utils;
+
using NUnit.Framework;
-using Google.Apis.Auth.OAuth2;
namespace Grpc.IntegrationTesting
{
@@ -118,12 +120,10 @@ namespace Grpc.IntegrationTesting
};
}
- using (Channel channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions))
- {
- TestService.TestServiceClient client = new TestService.TestServiceClient(channel);
- await RunTestCaseAsync(options.testCase, client);
- }
- GrpcEnvironment.Shutdown();
+ var channel = new Channel(options.serverHost, options.serverPort.Value, credentials, channelOptions);
+ TestService.TestServiceClient client = new TestService.TestServiceClient(channel);
+ await RunTestCaseAsync(options.testCase, client);
+ channel.ShutdownAsync().Wait();
}
private async Task RunTestCaseAsync(string testCase, TestService.TestServiceClient client)
@@ -169,6 +169,9 @@ namespace Grpc.IntegrationTesting
case "cancel_after_first_response":
await RunCancelAfterFirstResponseAsync(client);
break;
+ case "timeout_on_sleeping_server":
+ await RunTimeoutOnSleepingServerAsync(client);
+ break;
case "benchmark_empty_unary":
RunBenchmarkEmptyUnary(client);
break;
@@ -308,7 +311,7 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running service_account_creds");
var credential = await GoogleCredential.GetApplicationDefaultAsync();
credential = credential.CreateScoped(new[] { AuthScope });
- client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
+ client.HeaderInterceptor = AuthInterceptors.FromCredential(credential);
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
@@ -332,7 +335,7 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("running compute_engine_creds");
var credential = await GoogleCredential.GetApplicationDefaultAsync();
Assert.IsFalse(credential.IsCreateScopedRequired);
- client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
+ client.HeaderInterceptor = AuthInterceptors.FromCredential(credential);
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
@@ -357,7 +360,7 @@ namespace Grpc.IntegrationTesting
var credential = await GoogleCredential.GetApplicationDefaultAsync();
// check this a credential with scope support, but don't add the scope.
Assert.IsTrue(credential.IsCreateScopedRequired);
- client.HeaderInterceptor = OAuth2Interceptors.FromCredential(credential);
+ client.HeaderInterceptor = AuthInterceptors.FromCredential(credential);
var request = SimpleRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
@@ -381,7 +384,7 @@ namespace Grpc.IntegrationTesting
ITokenAccess credential = (await GoogleCredential.GetApplicationDefaultAsync()).CreateScoped(new[] { AuthScope });
string oauth2Token = await credential.GetAccessTokenForRequestAsync();
- client.HeaderInterceptor = OAuth2Interceptors.FromAccessToken(oauth2Token);
+ client.HeaderInterceptor = AuthInterceptors.FromAccessToken(oauth2Token);
var request = SimpleRequest.CreateBuilder()
.SetFillUsername(true)
@@ -401,7 +404,7 @@ namespace Grpc.IntegrationTesting
ITokenAccess credential = (await GoogleCredential.GetApplicationDefaultAsync()).CreateScoped(new[] { AuthScope });
string oauth2Token = await credential.GetAccessTokenForRequestAsync();
- var headerInterceptor = OAuth2Interceptors.FromAccessToken(oauth2Token);
+ var headerInterceptor = AuthInterceptors.FromAccessToken(oauth2Token);
var request = SimpleRequest.CreateBuilder()
.SetFillUsername(true)
@@ -409,7 +412,7 @@ namespace Grpc.IntegrationTesting
.Build();
var headers = new Metadata();
- headerInterceptor("", headers);
+ headerInterceptor(null, "", headers);
var response = client.UnaryCall(request, headers: headers);
Assert.AreEqual(AuthScopeResponse, response.OauthScope);
@@ -458,6 +461,29 @@ namespace Grpc.IntegrationTesting
Console.WriteLine("Passed!");
}
+ public static async Task RunTimeoutOnSleepingServerAsync(TestService.ITestServiceClient client)
+ {
+ Console.WriteLine("running timeout_on_sleeping_server");
+
+ var deadline = DateTime.UtcNow.AddMilliseconds(1);
+ using (var call = client.FullDuplexCall(deadline: deadline))
+ {
+ try
+ {
+ await call.RequestStream.WriteAsync(StreamingOutputCallRequest.CreateBuilder()
+ .SetPayload(CreateZerosPayload(27182)).Build());
+ }
+ catch (InvalidOperationException)
+ {
+ // Deadline was reached before write has started. Eat the exception and continue.
+ }
+
+ var ex = Assert.Throws<RpcException>(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.DeadlineExceeded, ex.Status.StatusCode);
+ }
+ Console.WriteLine("Passed!");
+ }
+
// This is not an official interop test, but it's useful.
public static void RunBenchmarkEmptyUnary(TestService.ITestServiceClient client)
{
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
index 6fa721bc1c..f3158aeb45 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs
@@ -75,9 +75,8 @@ namespace Grpc.IntegrationTesting
[TestFixtureTearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
@@ -127,5 +126,11 @@ namespace Grpc.IntegrationTesting
{
await InteropClient.RunCancelAfterFirstResponseAsync(client);
}
+
+ [Test]
+ public async Task TimeoutOnSleepingServerAsync()
+ {
+ await InteropClient.RunTimeoutOnSleepingServerAsync(client);
+ }
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
index 504fd11857..0cc8b2cde1 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropServer.cs
@@ -107,8 +107,6 @@ namespace Grpc.IntegrationTesting
server.Start();
server.ShutdownTask.Wait();
-
- GrpcEnvironment.Shutdown();
}
private static ServerOptions ParseArguments(string[] args)
diff --git a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
index 1c398eb84e..842795374f 100644
--- a/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/SslCredentialsTest.cs
@@ -85,9 +85,8 @@ namespace Grpc.IntegrationTesting
[TestFixtureTearDown]
public void Cleanup()
{
- channel.Dispose();
+ channel.ShutdownAsync().Wait();
server.ShutdownAsync().Wait();
- GrpcEnvironment.Shutdown();
}
[Test]
diff --git a/src/node/README.md b/src/node/README.md
index 7d3d8c7fa1..b6411537c7 100644
--- a/src/node/README.md
+++ b/src/node/README.md
@@ -5,11 +5,35 @@ Alpha : Ready for early adopters
## PREREQUISITES
- `node`: This requires `node` to be installed. If you instead have the `nodejs` executable on Debian, you should install the [`nodejs-legacy`](https://packages.debian.org/sid/nodejs-legacy) package.
-- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core.
+- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
## INSTALLATION
-On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][].
-Run the following command to install gRPC Node.js.
+
+**Linux (Debian):**
+
+Add [Debian unstable][] to your `sources.list` file. Example:
+
+```sh
+echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
+sudo tee -a /etc/apt/sources.list
+```
+
+Install the gRPC Debian package
+
+```sh
+sudo apt-get update
+sudo apt-get install libgrpc-dev
+```
+
+Install the gRPC NPM package
+
+```sh
+npm install grpc
+```
+
+**Mac OS X**
+
+Install [homebrew][]. Run the following command to install gRPC Node.js.
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s nodejs
```
@@ -88,5 +112,5 @@ ServerCredentials
An object with factory methods for creating credential objects for servers.
[homebrew]:http://brew.sh
-[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
+[Debian unstable]:https://www.debian.org/releases/sid/
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index a79a47427f..18858fa334 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -207,6 +207,13 @@ class SendMessageOp : public Op {
if (!::node::Buffer::HasInstance(value)) {
return false;
}
+ Handle<Object> object_value = value->ToObject();
+ if (object_value->HasOwnProperty(NanNew("grpcWriteFlags"))) {
+ Handle<Value> flag_value = object_value->Get(NanNew("grpcWriteFlags"));
+ if (flag_value->IsUint32()) {
+ out->flags = flag_value->Uint32Value() & GRPC_WRITE_USED_MASK;
+ }
+ }
out->data.send_message = BufferToByteBuffer(value);
Persistent<Value> *handle = new Persistent<Value>();
NanAssignPersistent(*handle, value);
@@ -457,10 +464,6 @@ void Call::Init(Handle<Object> exports) {
NanNew<FunctionTemplate>(GetPeer)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
Handle<Function> ctr = tpl->GetFunction();
- ctr->Set(NanNew("WRITE_BUFFER_HINT"),
- NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
- ctr->Set(NanNew("WRITE_NO_COMPRESS"),
- NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
exports->Set(NanNew("Call"), ctr);
constructor = new NanCallback(ctr);
}
@@ -620,7 +623,7 @@ NAN_METHOD(Call::StartBatch) {
call->wrapped_call, &ops[0], nops, new struct tag(
callback, op_vector.release(), resources), NULL);
if (error != GRPC_CALL_OK) {
- return NanThrowError("startBatch failed", error);
+ return NanThrowError(nanErrorWithCode("startBatch failed", error));
}
CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
@@ -634,7 +637,7 @@ NAN_METHOD(Call::Cancel) {
Call *call = ObjectWrap::Unwrap<Call>(args.This());
grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL);
if (error != GRPC_CALL_OK) {
- return NanThrowError("cancel failed", error);
+ return NanThrowError(nanErrorWithCode("cancel failed", error));
}
NanReturnUndefined();
}
diff --git a/src/node/ext/call.h b/src/node/ext/call.h
index 6acda76197..ef6e5fcd21 100644
--- a/src/node/ext/call.h
+++ b/src/node/ext/call.h
@@ -51,6 +51,19 @@ namespace node {
using std::unique_ptr;
using std::shared_ptr;
+/**
+ * Helper function for throwing errors with a grpc_call_error value.
+ * Modified from the answer by Gus Goose to
+ * http://stackoverflow.com/questions/31794200.
+ */
+inline v8::Local<v8::Value> nanErrorWithCode(const char *msg,
+ grpc_call_error code) {
+ NanEscapableScope();
+ v8::Local<v8::Object> err = NanError(msg).As<v8::Object>();
+ err->Set(NanNew("code"), NanNew<v8::Uint32>(code));
+ return NanEscapeScope(err);
+}
+
v8::Handle<v8::Value> ParseMetadata(const grpc_metadata_array *metadata_array);
class PersistentHolder {
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index d93dafda79..0cf30da922 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -196,6 +196,16 @@ void InitConnectivityStateConstants(Handle<Object> exports) {
channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE);
}
+void InitWriteFlags(Handle<Object> exports) {
+ NanScope();
+ Handle<Object> write_flags = NanNew<Object>();
+ exports->Set(NanNew("writeFlags"), write_flags);
+ Handle<Value> BUFFER_HINT(NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
+ write_flags->Set(NanNew("BUFFER_HINT"), BUFFER_HINT);
+ Handle<Value> NO_COMPRESS(NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
+ write_flags->Set(NanNew("NO_COMPRESS"), NO_COMPRESS);
+}
+
void init(Handle<Object> exports) {
NanScope();
grpc_init();
@@ -204,6 +214,7 @@ void init(Handle<Object> exports) {
InitOpTypeConstants(exports);
InitPropagateConstants(exports);
InitConnectivityStateConstants(exports);
+ InitWriteFlags(exports);
grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports);
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index 57c4310490..32a8ff55b1 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -233,7 +233,7 @@ NAN_METHOD(Server::RequestCall) {
new struct tag(new NanCallback(args[0].As<Function>()), ops.release(),
shared_ptr<Resources>(nullptr)));
if (error != GRPC_CALL_OK) {
- return NanThrowError("requestCall failed", error);
+ return NanThrowError(nanErrorWithCode("requestCall failed", error));
}
CompletionQueueAsyncWorker::Next();
NanReturnUndefined();
diff --git a/src/node/health_check/health.js b/src/node/health_check/health.js
index 87e00197fe..84d7e0568e 100644
--- a/src/node/health_check/health.js
+++ b/src/node/health_check/health.js
@@ -45,17 +45,13 @@ function HealthImplementation(statusMap) {
this.statusMap = _.clone(statusMap);
}
-HealthImplementation.prototype.setStatus = function(host, service, status) {
- if (!this.statusMap[host]) {
- this.statusMap[host] = {};
- }
- this.statusMap[host][service] = status;
+HealthImplementation.prototype.setStatus = function(service, status) {
+ this.statusMap[service] = status;
};
HealthImplementation.prototype.check = function(call, callback){
- var host = call.request.host;
var service = call.request.service;
- var status = _.get(this.statusMap, [host, service], null);
+ var status = _.get(this.statusMap, service, null);
if (status === null) {
callback({code:grpc.status.NOT_FOUND});
} else {
diff --git a/src/node/health_check/health.proto b/src/node/health_check/health.proto
index d31df1e0a7..57f4aaa9c0 100644
--- a/src/node/health_check/health.proto
+++ b/src/node/health_check/health.proto
@@ -32,8 +32,7 @@ syntax = "proto3";
package grpc.health.v1alpha;
message HealthCheckRequest {
- string host = 1;
- string service = 2;
+ string service = 1;
}
message HealthCheckResponse {
@@ -47,4 +46,4 @@ message HealthCheckResponse {
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
-} \ No newline at end of file
+}
diff --git a/src/node/index.js b/src/node/index.js
index 93c65ac5c4..889b0ac0e9 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -145,6 +145,11 @@ exports.propagate = grpc.propagate;
exports.callError = grpc.callError;
/**
+ * Write flag name to code number mapping
+ */
+exports.writeFlags = grpc.writeFlags;
+
+/**
* Credentials factories
*/
exports.Credentials = grpc.Credentials;
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 50cbf4a133..7b7eae51d2 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -79,13 +79,19 @@ function ClientWritableStream(call, serialize) {
* implementation of a method needed for implementing stream.Writable.
* @access private
* @param {Buffer} chunk The chunk to write
- * @param {string} encoding Ignored
+ * @param {string} encoding Used to pass write flags
* @param {function(Error=)} callback Called when the write is complete
*/
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
- batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
+ var message = this.serialize(chunk);
+ if (_.isFinite(encoding)) {
+ /* Attach the encoding if it is a finite number. This is the closest we
+ * can get to checking that it is valid flags */
+ message.grpcWriteFlags = encoding;
+ }
+ batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, event) {
if (err) {
// Something has gone wrong. Stop writing by failing to call callback
@@ -273,8 +279,12 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
return;
}
var client_batch = {};
+ var message = serialize(argument);
+ if (options) {
+ message.grpcWriteFlags = options.flags;
+ }
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
- client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
+ client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
@@ -407,9 +417,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
return;
}
var start_batch = {};
+ var message = serialize(argument);
+ if (options) {
+ message.grpcWriteFlags = options.flags;
+ }
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
- start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
+ start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
diff --git a/src/node/src/server.js b/src/node/src/server.js
index f2520c3c97..137f60ed12 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -115,8 +115,10 @@ function waitForCancel(call, emitter) {
* @param {function(*):Buffer=} serialize Serialization function for the
* response
* @param {Object=} metadata Optional trailing metadata to send with status
+ * @param {number=} flags Flags for modifying how the message is sent.
+ * Defaults to 0.
*/
-function sendUnaryResponse(call, value, serialize, metadata) {
+function sendUnaryResponse(call, value, serialize, metadata, flags) {
var end_batch = {};
var status = {
code: grpc.status.OK,
@@ -130,7 +132,9 @@ function sendUnaryResponse(call, value, serialize, metadata) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
call.metadataSent = true;
}
- end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
+ var message = serialize(value);
+ message.grpcWriteFlags = flags;
+ end_batch[grpc.opType.SEND_MESSAGE] = message;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
}
@@ -254,7 +258,7 @@ function ServerWritableStream(call, serialize) {
* for implementing stream.Writable.
* @access private
* @param {Buffer} chunk The chunk of data to write
- * @param {string} encoding Ignored
+ * @param {string} encoding Used to pass write flags
* @param {function(Error=)} callback Callback to indicate that the write is
* complete
*/
@@ -265,7 +269,13 @@ function _write(chunk, encoding, callback) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
this.call.metadataSent = true;
}
- batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
+ var message = this.serialize(chunk);
+ if (_.isFinite(encoding)) {
+ /* Attach the encoding if it is a finite number. This is the closest we
+ * can get to checking that it is valid flags */
+ message.grpcWriteFlags = encoding;
+ }
+ batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, value) {
if (err) {
this.emit('error', err);
@@ -450,14 +460,14 @@ function handleUnary(call, handler, metadata) {
if (emitter.cancelled) {
return;
}
- handler.func(emitter, function sendUnaryData(err, value, trailer) {
+ handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
if (err) {
if (trailer) {
err.metadata = trailer;
}
handleError(call, err);
} else {
- sendUnaryResponse(call, value, handler.serialize, trailer);
+ sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
});
});
@@ -514,7 +524,7 @@ function handleClientStreaming(call, handler, metadata) {
});
waitForCancel(call, stream);
stream.metadata = metadata;
- handler.func(stream, function(err, value, trailer) {
+ handler.func(stream, function(err, value, trailer, flags) {
stream.terminate();
if (err) {
if (trailer) {
@@ -522,7 +532,7 @@ function handleClientStreaming(call, handler, metadata) {
}
handleError(call, err);
} else {
- sendUnaryResponse(call, value, handler.serialize, trailer);
+ sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
});
}
diff --git a/src/node/test/health_test.js b/src/node/test/health_test.js
index 22c58d3956..9267bff7eb 100644
--- a/src/node/test/health_test.js
+++ b/src/node/test/health_test.js
@@ -41,13 +41,9 @@ var grpc = require('../');
describe('Health Checking', function() {
var statusMap = {
- '': {
- '': 'SERVING',
- 'grpc.test.TestService': 'NOT_SERVING',
- },
- virtual_host: {
- 'grpc.test.TestService': 'SERVING'
- }
+ '': 'SERVING',
+ 'grpc.test.TestServiceNotServing': 'NOT_SERVING',
+ 'grpc.test.TestServiceServing': 'SERVING'
};
var healthServer = new grpc.Server();
healthServer.addProtoService(health.service,
@@ -71,15 +67,15 @@ describe('Health Checking', function() {
});
});
it('should say that a disabled service is NOT_SERVING', function(done) {
- healthClient.check({service: 'grpc.test.TestService'},
+ healthClient.check({service: 'grpc.test.TestServiceNotServing'},
function(err, response) {
assert.ifError(err);
assert.strictEqual(response.status, 'NOT_SERVING');
done();
});
});
- it('should say that a service on another host is SERVING', function(done) {
- healthClient.check({host: 'virtual_host', service: 'grpc.test.TestService'},
+ it('should say that an enabled service is SERVING', function(done) {
+ healthClient.check({service: 'grpc.test.TestServiceServing'},
function(err, response) {
assert.ifError(err);
assert.strictEqual(response.status, 'SERVING');
@@ -93,12 +89,4 @@ describe('Health Checking', function() {
done();
});
});
- it('should get NOT_FOUND if the host is not registered', function(done) {
- healthClient.check({host: 'wrong_host', service: 'grpc.test.TestService'},
- function(err, response) {
- assert(err);
- assert.strictEqual(err.code, grpc.status.NOT_FOUND);
- done();
- });
- });
});
diff --git a/src/php/README.md b/src/php/README.md
index 1804606e09..f432935fde 100644
--- a/src/php/README.md
+++ b/src/php/README.md
@@ -7,17 +7,17 @@ This directory contains source code for PHP implementation of gRPC layered on sh
Alpha : Ready for early adopters
-## ENVIRONMENT
+## Environment
Prerequisite: PHP 5.5 or later, `phpunit`, `pecl`
-Linux:
+**Linux:**
```sh
$ sudo apt-get install php5 php5-dev phpunit php-pear
```
-OS X:
+**Mac OS X:**
```sh
$ curl https://phar.phpunit.de/phpunit.phar -o phpunit.phar
@@ -28,10 +28,39 @@ $ curl -O http://pear.php.net/go-pear.phar
$ sudo php -d detect_unicode=0 go-pear.phar
```
-## Build from Homebrew
+## Quick Install
-On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. Run the following command to
-install gRPC.
+**Linux (Debian):**
+
+Add [Debian unstable][] to your `sources.list` file. Example:
+
+```sh
+echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
+sudo tee -a /etc/apt/sources.list
+```
+
+Install the gRPC Debian package
+
+```sh
+sudo apt-get update
+sudo apt-get install libgrpc-dev
+```
+
+Install the gRPC PHP extension
+
+```sh
+sudo pecl install grpc-alpha
+```
+
+**Mac OS X:**
+
+Install [homebrew][]. Example:
+
+```sh
+ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
+```
+
+Install the gRPC core library and the PHP extension in one step
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s php
@@ -39,6 +68,7 @@ $ curl -fsSL https://goo.gl/getgrpc | bash -s php
This will download and run the [gRPC install script][] and compile the gRPC PHP extension.
+
## Build from Source
Clone this repository
@@ -71,7 +101,7 @@ $ sudo make install
Install the gRPC PHP extension
```sh
-$ sudo pecl install grpc
+$ sudo pecl install grpc-alpha
```
OR
@@ -140,6 +170,6 @@ $ ./bin/run_gen_code_test.sh
```
[homebrew]:http://brew.sh
-[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[Node]:https://github.com/grpc/grpc/tree/master/src/node/examples
+[Debian unstable]:https://www.debian.org/releases/sid/
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index 4e40dc43ce..252623d0c3 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -216,13 +216,18 @@ PHP_METHOD(Call, __construct) {
char *method;
int method_len;
zval *deadline_obj;
- /* "OsO" == 1 Object, 1 string, 1 Object */
- if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO", &channel_obj,
- grpc_ce_channel, &method, &method_len,
- &deadline_obj, grpc_ce_timeval) == FAILURE) {
+ char *host_override = NULL;
+ int host_override_len = 0;
+ /* "OsO|s" == 1 Object, 1 string, 1 Object, 1 optional string */
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO|s",
+ &channel_obj, grpc_ce_channel,
+ &method, &method_len,
+ &deadline_obj, grpc_ce_timeval,
+ &host_override, &host_override_len)
+ == FAILURE) {
zend_throw_exception(
spl_ce_InvalidArgumentException,
- "Call expects a Channel, a String, and a Timeval",
+ "Call expects a Channel, a String, a Timeval and an optional String",
1 TSRMLS_CC);
return;
}
@@ -241,7 +246,7 @@ PHP_METHOD(Call, __construct) {
deadline_obj TSRMLS_CC);
call->wrapped = grpc_channel_create_call(
channel->wrapped, NULL, GRPC_PROPAGATE_DEFAULTS, completion_queue, method,
- channel->target, deadline->wrapped, NULL);
+ host_override, deadline->wrapped, NULL);
}
/**
@@ -273,7 +278,6 @@ PHP_METHOD(Call, startBatch) {
grpc_byte_buffer *message;
int cancelled;
grpc_call_error error;
- grpc_event event;
zval *result;
char *message_str;
size_t message_len;
@@ -409,14 +413,8 @@ PHP_METHOD(Call, startBatch) {
(long)error TSRMLS_CC);
goto cleanup;
}
- event = grpc_completion_queue_pluck(completion_queue, call->wrapped,
- gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
- if (!event.success) {
- zend_throw_exception(spl_ce_LogicException,
- "The batch failed for some reason",
- 1 TSRMLS_CC);
- goto cleanup;
- }
+ grpc_completion_queue_pluck(completion_queue, call->wrapped,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
for (int i = 0; i < op_num; i++) {
switch(ops[i].op) {
case GRPC_OP_SEND_INITIAL_METADATA:
diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c
index f8ce04d902..7a981675de 100644
--- a/src/php/ext/grpc/channel.c
+++ b/src/php/ext/grpc/channel.c
@@ -64,7 +64,6 @@ void free_wrapped_grpc_channel(void *object TSRMLS_DC) {
if (channel->wrapped != NULL) {
grpc_channel_destroy(channel->wrapped);
}
- efree(channel->target);
efree(channel);
}
@@ -141,9 +140,6 @@ PHP_METHOD(Channel, __construct) {
HashTable *array_hash;
zval **creds_obj = NULL;
wrapped_grpc_credentials *creds = NULL;
- zval **override_obj;
- char *override;
- int override_len;
/* "s|a" == 1 string, 1 optional array */
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|a", &target,
&target_length, &args_array) == FAILURE) {
@@ -151,8 +147,6 @@ PHP_METHOD(Channel, __construct) {
"Channel expects a string and an array", 1 TSRMLS_CC);
return;
}
- override = target;
- override_len = target_length;
if (args_array == NULL) {
channel->wrapped = grpc_insecure_channel_create(target, NULL, NULL);
} else {
@@ -169,19 +163,6 @@ PHP_METHOD(Channel, __construct) {
*creds_obj TSRMLS_CC);
zend_hash_del(array_hash, "credentials", 12);
}
- if (zend_hash_find(array_hash, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
- sizeof(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG),
- (void **)&override_obj) == SUCCESS) {
- if (Z_TYPE_PP(override_obj) != IS_STRING) {
- zend_throw_exception(spl_ce_InvalidArgumentException,
- GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
- " must be a string",
- 1 TSRMLS_CC);
- return;
- }
- override = Z_STRVAL_PP(override_obj);
- override_len = Z_STRLEN_PP(override_obj);
- }
php_grpc_read_args_array(args_array, &args);
if (creds == NULL) {
channel->wrapped = grpc_insecure_channel_create(target, &args, NULL);
@@ -192,8 +173,6 @@ PHP_METHOD(Channel, __construct) {
}
efree(args.args);
}
- channel->target = ecalloc(override_len + 1, sizeof(char));
- memcpy(channel->target, override, override_len);
}
/**
diff --git a/src/php/ext/grpc/channel.h b/src/php/ext/grpc/channel.h
index c13fa4c6d7..78a16ed0c9 100755
--- a/src/php/ext/grpc/channel.h
+++ b/src/php/ext/grpc/channel.h
@@ -53,7 +53,6 @@ typedef struct wrapped_grpc_channel {
zend_object std;
grpc_channel *wrapped;
- char *target;
} wrapped_grpc_channel;
/* Initializes the Channel class */
diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
index 8b7e67f57c..287621d930 100644
--- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
+++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php
@@ -39,6 +39,14 @@ abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase {
protected static $client;
protected static $timeout;
+ public function testWaitForNotReady() {
+ $this->assertFalse(self::$client->waitForReady(1));
+ }
+
+ public function testWaitForReady() {
+ $this->assertTrue(self::$client->waitForReady(250000));
+ }
+
public function testSimpleRequest() {
$div_arg = new math\DivArgs();
$div_arg->setDividend(7);
diff --git a/src/php/tests/generated_code/GeneratedCodeTest.php b/src/php/tests/generated_code/GeneratedCodeTest.php
index 1e4742fc80..a1a2ce81db 100755
--- a/src/php/tests/generated_code/GeneratedCodeTest.php
+++ b/src/php/tests/generated_code/GeneratedCodeTest.php
@@ -35,7 +35,7 @@ require 'AbstractGeneratedCodeTest.php';
class GeneratedCodeTest extends AbstractGeneratedCodeTest {
public static function setUpBeforeClass() {
- self::$client = new math\MathClient(new Grpc\BaseStub(
- getenv('GRPC_TEST_HOST'), []));
+ self::$client = new math\MathClient(
+ getenv('GRPC_TEST_HOST'), []);
}
}
diff --git a/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php b/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php
index f8ec1e7da8..68f57d34ad 100644
--- a/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php
+++ b/src/php/tests/generated_code/GeneratedCodeWithCallbackTest.php
@@ -35,13 +35,13 @@ require 'AbstractGeneratedCodeTest.php';
class GeneratedCodeWithCallbackTest extends AbstractGeneratedCodeTest {
public static function setUpBeforeClass() {
- self::$client = new math\MathClient(new Grpc\BaseStub(
+ self::$client = new math\MathClient(
getenv('GRPC_TEST_HOST'), ['update_metadata' =>
function($a_hash,
$client = array()) {
$a_copy = $a_hash;
$a_copy['foo'] = ['bar'];
return $a_copy;
- }]));
+ }]);
}
}
diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php
index 44e6242c29..376d306da0 100755
--- a/src/php/tests/interop/interop_client.php
+++ b/src/php/tests/interop/interop_client.php
@@ -271,7 +271,7 @@ function cancelAfterFirstResponse($stub) {
}
function timeoutOnSleepingServer($stub) {
- $call = $stub->FullDuplexCall(array('timeout' => 500000));
+ $call = $stub->FullDuplexCall(array('timeout' => 1000));
$request = new grpc\testing\StreamingOutputCallRequest();
$request->setResponseType(grpc\testing\PayloadType::COMPRESSABLE);
$response_parameters = new grpc\testing\ResponseParameters();
diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php
index f91c006da5..60341b983d 100755
--- a/src/php/tests/unit_tests/SecureEndToEndTest.php
+++ b/src/php/tests/unit_tests/SecureEndToEndTest.php
@@ -40,13 +40,15 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
file_get_contents(dirname(__FILE__) . '/../data/server1.key'),
file_get_contents(dirname(__FILE__) . '/../data/server1.pem'));
$this->server = new Grpc\Server();
- $port = $this->server->addSecureHttp2Port('0.0.0.0:0',
+ $this->port = $this->server->addSecureHttp2Port('0.0.0.0:0',
$server_credentials);
$this->server->start();
+ $this->host_override = 'foo.test.google.fr';
$this->channel = new Grpc\Channel(
- 'localhost:' . $port,
+ 'localhost:' . $this->port,
[
- 'grpc.ssl_target_name_override' => 'foo.test.google.fr',
+ 'grpc.ssl_target_name_override' => $this->host_override,
+ 'grpc.default_authority' => $this->host_override,
'credentials' => $credentials
]);
}
@@ -61,7 +63,8 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$status_text = 'xyz';
$call = new Grpc\Call($this->channel,
'dummy_method',
- $deadline);
+ $deadline,
+ $this->host_override);
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
@@ -112,7 +115,8 @@ class SecureEndToEndTest extends PHPUnit_Framework_TestCase{
$call = new Grpc\Call($this->channel,
'dummy_method',
- $deadline);
+ $deadline,
+ $this->host_override);
$event = $call->startBatch([
Grpc\OP_SEND_INITIAL_METADATA => [],
diff --git a/src/python/README.md b/src/python/README.md
index 2beb3a913a..de0142db05 100644
--- a/src/python/README.md
+++ b/src/python/README.md
@@ -9,12 +9,36 @@ Alpha : Ready for early adopters
PREREQUISITES
-------------
- Python 2.7, virtualenv, pip
-- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core.
+- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
INSTALLATION
-------------
-On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][].
-Run the following command to install gRPC Python.
+
+**Linux (Debian):**
+
+Add [Debian unstable][] to your `sources.list` file. Example:
+
+```sh
+echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
+sudo tee -a /etc/apt/sources.list
+```
+
+Install the gRPC Debian package
+
+```sh
+sudo apt-get update
+sudo apt-get install libgrpc-dev
+```
+
+Install the gRPC Python module
+
+```sh
+sudo pip install grpcio
+```
+
+**Mac OS X**
+
+Install [homebrew][]. Run the following command to install gRPC Python.
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s python
```
@@ -27,11 +51,6 @@ Please read our online documentation for a [Quick Start][] and a [detailed examp
BUILDING FROM SOURCE
---------------------
- Clone this repository
-- Build the gRPC core from the root of the
- [gRPC Git repository](https://github.com/grpc/grpc)
-```
-$ make shared_c static_c
-```
- Use build_python.sh to build the Python code and install it into a virtual environment
```
@@ -60,7 +79,7 @@ $ ../../tools/distrib/python/submit.py
```
[homebrew]:http://brew.sh
-[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html
[detailed example]:http://www.grpc.io/docs/installation/python.html
+[Debian unstable]:https://www.debian.org/releases/sid/
diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst
index 00bdecf56f..c7b5a3bde4 100644
--- a/src/python/grpcio/README.rst
+++ b/src/python/grpcio/README.rst
@@ -6,7 +6,7 @@ Package for GRPC Python.
Dependencies
------------
-Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_.
+Ensure you have installed the gRPC core. On Mac OS X, install homebrew_.
Run the following command to install gRPC Python.
::
@@ -19,5 +19,4 @@ Otherwise, `install from source`_
.. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source
.. _homebrew: http://brew.sh
-.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation
.. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
diff --git a/src/ruby/README.md b/src/ruby/README.md
index 4b657c0bd4..f8902e34c5 100644
--- a/src/ruby/README.md
+++ b/src/ruby/README.md
@@ -12,12 +12,36 @@ PREREQUISITES
-------------
- Ruby 2.x. The gRPC API uses keyword args.
-- [homebrew][] on Mac OS X, [linuxbrew][] on Linux. These simplify the installation of the gRPC C core.
+- [homebrew][] on Mac OS X. These simplify the installation of the gRPC C core.
INSTALLATION
---------------
-On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][].
-Run the following command to install gRPC Ruby.
+
+**Linux (Debian):**
+
+Add [Debian unstable][] to your `sources.list` file. Example:
+
+```sh
+echo "deb http://ftp.us.debian.org/debian unstable main contrib non-free" | \
+sudo tee -a /etc/apt/sources.list
+```
+
+Install the gRPC Debian package
+
+```sh
+sudo apt-get update
+sudo apt-get install libgrpc-dev
+```
+
+Install the gRPC Ruby package
+
+```sh
+gem install grpc
+```
+
+**Mac OS X**
+
+Install [homebrew][]. Run the following command to install gRPC Ruby.
```sh
$ curl -fsSL https://goo.gl/getgrpc | bash -s ruby
```
@@ -26,12 +50,6 @@ This will download and run the [gRPC install script][], then install the latest
BUILD FROM SOURCE
---------------------
- Clone this repository
-- Build the gRPC C core
-E.g, from the root of the gRPC [Git repository](https://github.com/google/grpc)
-```sh
-$ cd ../..
-$ make && sudo make install
-```
- Install Ruby 2.x. Consider doing this with [RVM](http://rvm.io), it's a nice way of controlling
the exact ruby version that's used.
@@ -77,8 +95,8 @@ Directory structure is the layout for [ruby extensions][]
GRPC.logger.info("Answer: #{resp.inspect}")
```
[homebrew]:http://brew.sh
-[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation
[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install
[ruby extensions]:http://guides.rubygems.org/gems-with-extensions/
[rubydoc]: http://www.rubydoc.info/gems/grpc
[grpc.io]: http://www.grpc.io/docs/installation/ruby.html
+[Debian unstable]:https://www.debian.org/releases/sid/
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 36c6818a7e..6b5beb6f5d 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -82,6 +82,10 @@ static ID id_metadata;
* received by the call and subsequently saved on it. */
static ID id_status;
+/* id_write_flag is name of the attribute used to access the write_flag
+ * saved on the call. */
+static ID id_write_flag;
+
/* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
static VALUE sym_send_message;
static VALUE sym_send_metadata;
@@ -240,6 +244,30 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
return rb_ivar_set(self, id_metadata, metadata);
}
+/*
+ call-seq:
+ write_flag = call.write_flag
+
+ Gets the write_flag value saved the call. */
+static VALUE grpc_rb_call_get_write_flag(VALUE self) {
+ return rb_ivar_get(self, id_write_flag);
+}
+
+/*
+ call-seq:
+ call.write_flag = write_flag
+
+ Saves the write_flag on the call. */
+static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
+ if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
+ rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
+ rb_obj_classname(write_flag));
+ return Qnil;
+ }
+
+ return rb_ivar_set(self, id_write_flag, write_flag);
+}
+
/* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
to fill grpc_metadata_array.
@@ -437,17 +465,19 @@ typedef struct run_batch_stack {
grpc_status_code recv_status;
char *recv_status_details;
size_t recv_status_details_capacity;
+ uint write_flag;
} run_batch_stack;
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
-static void grpc_run_batch_stack_init(run_batch_stack *st) {
+static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
grpc_metadata_array_init(&st->recv_metadata);
grpc_metadata_array_init(&st->recv_trailing_metadata);
st->op_num = 0;
+ st->write_flag = write_flag;
}
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
@@ -477,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
this_op = rb_ary_entry(ops_ary, i);
this_value = rb_hash_aref(ops_hash, this_op);
+ st->ops[st->op_num].flags = 0;
switch (NUM2INT(this_op)) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* N.B. later there is no need to explicitly delete the metadata keys
@@ -490,6 +521,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
case GRPC_OP_SEND_MESSAGE:
st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
RSTRING_PTR(this_value), RSTRING_LEN(this_value));
+ st->ops[st->op_num].flags = st->write_flag;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
@@ -525,7 +557,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
NUM2INT(this_op));
};
st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
- st->ops[st->op_num].flags = 0;
st->ops[st->op_num].reserved = NULL;
st->op_num++;
}
@@ -604,6 +635,8 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_event ev;
grpc_call_error err;
VALUE result = Qnil;
+ VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
+ uint write_flag = 0;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
/* Validate the ops args, adding them to a ruby array */
@@ -611,7 +644,10 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
return Qnil;
}
- grpc_run_batch_stack_init(&st);
+ if (rb_write_flag != Qnil) {
+ write_flag = NUM2UINT(rb_write_flag);
+ }
+ grpc_run_batch_stack_init(&st, write_flag);
grpc_run_batch_stack_fill_ops(&st, ops_hash);
/* call grpc_call_start_batch, then wait for it to complete using
@@ -638,6 +674,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
return result;
}
+static void Init_grpc_write_flags() {
+ /* Constants representing the write flags in grpc.h */
+ VALUE grpc_rb_mWriteFlags =
+ rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
+ rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
+ UINT2NUM(GRPC_WRITE_BUFFER_HINT));
+ rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
+ UINT2NUM(GRPC_WRITE_NO_COMPRESS));
+}
+
static void Init_grpc_error_codes() {
/* Constants representing the error codes of grpc_call_error in grpc.h */
VALUE grpc_rb_mRpcErrors =
@@ -735,10 +781,14 @@ void Init_grpc_call() {
rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
+ rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
+ rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
+ 1);
/* Ids used to support call attributes */
id_metadata = rb_intern("metadata");
id_status = rb_intern("status");
+ id_write_flag = rb_intern("write_flag");
/* Ids used by the c wrapping internals. */
id_cq = rb_intern("__cq");
@@ -766,6 +816,7 @@ void Init_grpc_call() {
Init_grpc_error_codes();
Init_grpc_op_codes();
+ Init_grpc_write_flags();
}
/* Gets the call from the ruby object */
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 17da401c6b..d9cb924735 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -59,7 +59,7 @@ module GRPC
include Core::CallOps
extend Forwardable
attr_reader(:deadline)
- def_delegators :@call, :cancel, :metadata
+ def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=
# client_invoke begins a client invocation.
#
@@ -484,6 +484,7 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status, :start_call, :wait)
+ :metadata, :status, :start_call, :wait, :write_flag,
+ :write_flag=)
end
end
diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb
index 2bb7c86d5e..6b442febcb 100644
--- a/src/ruby/lib/grpc/logconfig.rb
+++ b/src/ruby/lib/grpc/logconfig.rb
@@ -54,5 +54,6 @@ module GRPC
LOGGER = NoopLogger.new
end
- include DefaultLogger unless method_defined?(:logger)
+ # Inject the noop #logger if no module-level logger method has been injected.
+ extend DefaultLogger unless methods.include?(:logger)
end
diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb
index 3c5d33ffcd..dd3c45f754 100644
--- a/src/ruby/spec/call_spec.rb
+++ b/src/ruby/spec/call_spec.rb
@@ -31,6 +31,14 @@ require 'grpc'
include GRPC::Core::StatusCodes
+describe GRPC::Core::WriteFlags do
+ it 'should define the known write flag values' do
+ m = GRPC::Core::WriteFlags
+ expect(m.const_get(:BUFFER_HINT)).to_not be_nil
+ expect(m.const_get(:NO_COMPRESS)).to_not be_nil
+ end
+end
+
describe GRPC::Core::RpcErrors do
before(:each) do
@known_types = {
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 26208b714a..fcd7bd082f 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -35,6 +35,7 @@ describe GRPC::ActiveCall do
ActiveCall = GRPC::ActiveCall
Call = GRPC::Core::Call
CallOps = GRPC::Core::CallOps
+ WriteFlags = GRPC::Core::WriteFlags
before(:each) do
@pass_through = proc { |x| x }
@@ -129,6 +130,31 @@ describe GRPC::ActiveCall do
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
+
+ TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS]
+ TEST_WRITE_FLAGS.each do |f|
+ it "successfully makes calls with write_flag set to #{f}" do
+ call = make_test_call
+ ActiveCall.client_invoke(call, @client_queue)
+ marshal = proc { |x| 'marshalled:' + x }
+ client_call = ActiveCall.new(call, @client_queue, marshal,
+ @pass_through, deadline)
+ msg = 'message is a string'
+ client_call.write_flag = f
+ client_call.remote_send(msg)
+
+ # confirm that the message was marshalled
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_call = recvd_rpc.call
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => nil
+ }
+ recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+ server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
+ @pass_through, deadline)
+ expect(server_call.remote_read).to eq('marshalled:' + msg)
+ end
+ end
end
describe '#client_invoke' do
@@ -261,7 +287,7 @@ describe GRPC::ActiveCall do
client_call.writes_done(false)
server_call = expect_server_to_receive(msg)
e = client_call.each_remote_read
- n = 3 # arbitrary value > 1
+ n = 3 # arbitrary value > 1
n.times do
server_call.remote_send(reply)
expect(e.next).to eq(reply)