aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj7
-rw-r--r--src/csharp/Grpc.Core.Tests/packages.config1
-rw-r--r--src/csharp/Grpc.Core/AsyncClientStreamingCall.cs18
-rw-r--r--src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs27
-rw-r--r--src/csharp/Grpc.Core/AsyncServerStreamingCall.cs10
-rw-r--r--src/csharp/Grpc.Core/Call.cs2
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.csproj3
-rw-r--r--src/csharp/Grpc.Core/Grpc.Core.nuspec1
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamReader.cs8
-rw-r--r--src/csharp/Grpc.Core/IAsyncStreamWriter.cs3
-rw-r--r--src/csharp/Grpc.Core/IClientStreamWriter.cs5
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientRequestStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs29
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs11
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs29
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs3
-rw-r--r--src/csharp/Grpc.Core/ServerCallContext.cs1
-rw-r--r--src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs26
-rw-r--r--src/csharp/Grpc.Core/packages.config1
-rw-r--r--src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj4
-rw-r--r--src/csharp/Grpc.Examples.Tests/packages.config1
-rw-r--r--src/csharp/Grpc.Examples/Grpc.Examples.csproj3
-rw-r--r--src/csharp/Grpc.Examples/packages.config1
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj3
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs43
-rw-r--r--src/csharp/Grpc.IntegrationTesting/packages.config1
-rwxr-xr-xsrc/ruby/bin/apis/pubsub_demo.rb4
-rwxr-xr-xsrc/ruby/bin/interop/interop_client.rb6
-rwxr-xr-xsrc/ruby/bin/interop/interop_server.rb8
-rwxr-xr-xsrc/ruby/bin/math_client.rb44
-rwxr-xr-xsrc/ruby/bin/math_server.rb8
-rwxr-xr-xsrc/ruby/bin/noproto_client.rb8
-rwxr-xr-xsrc/ruby/bin/noproto_server.rb6
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb10
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb22
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb14
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb28
-rw-r--r--src/ruby/lib/grpc/generic/service.rb8
-rw-r--r--src/ruby/lib/grpc/logconfig.rb5
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb4
40 files changed, 205 insertions, 215 deletions
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index eac8d16fb1..62cb443272 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -34,6 +34,9 @@
<HintPath>..\packages\NUnit.2.6.4\lib\nunit.framework.dll</HintPath>
</Reference>
<Reference Include="System" />
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
@@ -57,7 +60,5 @@
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
- <ItemGroup>
- <Folder Include="Internal\" />
- </ItemGroup>
+ <ItemGroup />
</Project> \ No newline at end of file
diff --git a/src/csharp/Grpc.Core.Tests/packages.config b/src/csharp/Grpc.Core.Tests/packages.config
index c714ef3a23..28af8d78c6 100644
--- a/src/csharp/Grpc.Core.Tests/packages.config
+++ b/src/csharp/Grpc.Core.Tests/packages.config
@@ -1,4 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index b95776f66d..8cdc1c895b 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -41,8 +41,6 @@ namespace Grpc.Core
/// Return type for client streaming calls.
/// </summary>
public sealed class AsyncClientStreamingCall<TRequest, TResponse>
- where TRequest : class
- where TResponse : class
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly Task<TResponse> result;
@@ -54,22 +52,6 @@ namespace Grpc.Core
}
/// <summary>
- /// Writes a request to RequestStream.
- /// </summary>
- public Task Write(TRequest message)
- {
- return requestStream.Write(message);
- }
-
- /// <summary>
- /// Closes the RequestStream.
- /// </summary>
- public Task Close()
- {
- return requestStream.Close();
- }
-
- /// <summary>
/// Asynchronous call result.
/// </summary>
public Task<TResponse> Result
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index ee05437416..0d13a3d052 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -41,8 +41,6 @@ namespace Grpc.Core
/// Return type for bidirectional streaming calls.
/// </summary>
public sealed class AsyncDuplexStreamingCall<TRequest, TResponse>
- where TRequest : class
- where TResponse : class
{
readonly IClientStreamWriter<TRequest> requestStream;
readonly IAsyncStreamReader<TResponse> responseStream;
@@ -54,31 +52,6 @@ namespace Grpc.Core
}
/// <summary>
- /// Writes a request to RequestStream.
- /// </summary>
- public Task Write(TRequest message)
- {
- return requestStream.Write(message);
- }
-
- /// <summary>
- /// Closes the RequestStream.
- /// </summary>
- public Task Close()
- {
- return requestStream.Close();
- }
-
- /// <summary>
- /// Reads a response from ResponseStream.
- /// </summary>
- /// <returns></returns>
- public Task<TResponse> ReadNext()
- {
- return responseStream.ReadNext();
- }
-
- /// <summary>
/// Async stream to read streaming responses.
/// </summary>
public IAsyncStreamReader<TResponse> ResponseStream
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index 73b9614985..6a258d132c 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -41,7 +41,6 @@ namespace Grpc.Core
/// Return type for server streaming calls.
/// </summary>
public sealed class AsyncServerStreamingCall<TResponse>
- where TResponse : class
{
readonly IAsyncStreamReader<TResponse> responseStream;
@@ -51,15 +50,6 @@ namespace Grpc.Core
}
/// <summary>
- /// Reads the next response from ResponseStream
- /// </summary>
- /// <returns></returns>
- public Task<TResponse> ReadNext()
- {
- return responseStream.ReadNext();
- }
-
- /// <summary>
/// Async stream to read streaming responses.
/// </summary>
public IAsyncStreamReader<TResponse> ResponseStream
diff --git a/src/csharp/Grpc.Core/Call.cs b/src/csharp/Grpc.Core/Call.cs
index d1ee59ff0a..37b452f020 100644
--- a/src/csharp/Grpc.Core/Call.cs
+++ b/src/csharp/Grpc.Core/Call.cs
@@ -41,8 +41,6 @@ namespace Grpc.Core
/// Abstraction of a call to be invoked on a client.
/// </summary>
public class Call<TRequest, TResponse>
- where TRequest : class
- where TResponse : class
{
readonly string name;
readonly Marshaller<TRequest> requestMarshaller;
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index f5f2cf5f22..6b4345cbe1 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -37,6 +37,9 @@
<Reference Include="System.Collections.Immutable">
<HintPath>..\packages\Microsoft.Bcl.Immutable.1.0.34\lib\portable-net45+win8+wp8+wpa81\System.Collections.Immutable.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncDuplexStreamingCall.cs" />
diff --git a/src/csharp/Grpc.Core/Grpc.Core.nuspec b/src/csharp/Grpc.Core/Grpc.Core.nuspec
index e54908cb8b..5269881afa 100644
--- a/src/csharp/Grpc.Core/Grpc.Core.nuspec
+++ b/src/csharp/Grpc.Core/Grpc.Core.nuspec
@@ -16,6 +16,7 @@
<tags>gRPC RPC Protocol HTTP/2</tags>
<dependencies>
<dependency id="Microsoft.Bcl.Immutable" version="1.0.34" />
+ <dependency id="Ix-Async" version="1.2.3" />
<dependency id="grpc.native.csharp_ext" version="0.8.0.0" />
</dependencies>
</metadata>
diff --git a/src/csharp/Grpc.Core/IAsyncStreamReader.cs b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
index 699741cd05..95b674c018 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamReader.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamReader.cs
@@ -43,13 +43,7 @@ namespace Grpc.Core
/// A stream of messages to be read.
/// </summary>
/// <typeparam name="T"></typeparam>
- public interface IAsyncStreamReader<T>
- where T : class
+ public interface IAsyncStreamReader<TResponse> : IAsyncEnumerator<TResponse>
{
- /// <summary>
- /// Reads a single message. Returns null if the last message was already read.
- /// A following read can only be started when the previous one finishes.
- /// </summary>
- Task<T> ReadNext();
}
}
diff --git a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
index 4bd8bfb8df..644f445401 100644
--- a/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IAsyncStreamWriter.cs
@@ -44,10 +44,9 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IAsyncStreamWriter<T>
- where T : class
{
/// <summary>
- /// Writes a single message. Only one write can be pending at a time.
+ /// Writes a single asynchronously. Only one write can be pending at a time.
/// </summary>
/// <param name="message">the message to be written. Cannot be null.</param>
Task Write(T message);
diff --git a/src/csharp/Grpc.Core/IClientStreamWriter.cs b/src/csharp/Grpc.Core/IClientStreamWriter.cs
index 0847a928e6..cc76d1369d 100644
--- a/src/csharp/Grpc.Core/IClientStreamWriter.cs
+++ b/src/csharp/Grpc.Core/IClientStreamWriter.cs
@@ -44,11 +44,10 @@ namespace Grpc.Core
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IClientStreamWriter<T> : IAsyncStreamWriter<T>
- where T : class
{
/// <summary>
- /// Closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
+ /// Completes/closes the stream. Can only be called once there is no pending write. No writes should follow calling this.
/// </summary>
- Task Close();
+ Task Complete();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
index 1697058732..b9fc10cd16 100644
--- a/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientRequestStream.cs
@@ -38,8 +38,6 @@ namespace Grpc.Core.Internal
/// Writes requests asynchronously to an underlying AsyncCall object.
/// </summary>
internal class ClientRequestStream<TRequest, TResponse> : IClientStreamWriter<TRequest>
- where TRequest : class
- where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
@@ -55,7 +53,7 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
- public Task Close()
+ public Task Complete()
{
var taskSource = new AsyncCompletionTaskSource<object>();
call.StartSendCloseFromClient(taskSource.CompletionDelegate);
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index b2378cade6..6c44521038 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCall<TRequest, TResponse> call;
+ TResponse current;
public ClientResponseStream(AsyncCall<TRequest, TResponse> call)
{
this.call = call;
}
- public Task<TResponse> ReadNext()
+ public TResponse Current
{
+ get
+ {
+ if (current == null)
+ {
+ throw new InvalidOperationException("No current element is available.");
+ }
+ return current;
+ }
+ }
+
+ public async Task<bool> MoveNext(CancellationToken token)
+ {
+ if (token != CancellationToken.None)
+ {
+ throw new InvalidOperationException("Cancellation of individual reads is not supported.");
+ }
var taskSource = new AsyncCompletionTaskSource<TResponse>();
call.StartReadMessage(taskSource.CompletionDelegate);
- return taskSource.Task;
+ var result = await taskSource.Task;
+ this.current = result;
+ return result != null;
+ }
+
+ public void Dispose()
+ {
+ // TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 95d8e97869..20ac46c234 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -71,9 +72,10 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
- var request = await requestStream.ReadNext();
+ Preconditions.CheckArgument(await requestStream.MoveNext());
+ var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- Preconditions.CheckArgument(await requestStream.ReadNext() == null);
+ Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
var result = await handler(context, request);
await responseStream.Write(result);
@@ -122,9 +124,10 @@ namespace Grpc.Core.Internal
Status status = Status.DefaultSuccess;
try
{
- var request = await requestStream.ReadNext();
+ Preconditions.CheckArgument(await requestStream.MoveNext());
+ var request = requestStream.Current;
// TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- Preconditions.CheckArgument(await requestStream.ReadNext() == null);
+ Preconditions.CheckArgument(!await requestStream.MoveNext());
var context = new ServerCallContext(); // TODO(jtattermusch): initialize the context
await handler(context, request, responseStream);
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index d9ee0c815b..3fccb88abb 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -33,6 +33,7 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
namespace Grpc.Core.Internal
@@ -42,17 +43,41 @@ namespace Grpc.Core.Internal
where TResponse : class
{
readonly AsyncCallServer<TRequest, TResponse> call;
+ TRequest current;
public ServerRequestStream(AsyncCallServer<TRequest, TResponse> call)
{
this.call = call;
}
- public Task<TRequest> ReadNext()
+ public TRequest Current
{
+ get
+ {
+ if (current == null)
+ {
+ throw new InvalidOperationException("No current element is available.");
+ }
+ return current;
+ }
+ }
+
+ public async Task<bool> MoveNext(CancellationToken token)
+ {
+ if (token != CancellationToken.None)
+ {
+ throw new InvalidOperationException("Cancellation of individual reads is not supported.");
+ }
var taskSource = new AsyncCompletionTaskSource<TRequest>();
call.StartReadMessage(taskSource.CompletionDelegate);
- return taskSource.Task;
+ var result = await taskSource.Task;
+ this.current = result;
+ return result != null;
+ }
+
+ public void Dispose()
+ {
+ // TODO(jtattermusch): implement the semantics of stream disposal.
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index 731ea2be81..7a1c016ae2 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -39,9 +39,6 @@ using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
- // TODO: we need to make sure that the delegates are not collected before invoked.
- //internal delegate void ServerShutdownCallbackDelegate(bool success);
-
/// <summary>
/// grpc_server from grpc/grpc.h
/// </summary>
diff --git a/src/csharp/Grpc.Core/ServerCallContext.cs b/src/csharp/Grpc.Core/ServerCallContext.cs
index e873b3e88a..bc9a499c51 100644
--- a/src/csharp/Grpc.Core/ServerCallContext.cs
+++ b/src/csharp/Grpc.Core/ServerCallContext.cs
@@ -42,7 +42,6 @@ namespace Grpc.Core
/// </summary>
public sealed class ServerCallContext
{
-
// TODO(jtattermusch): add cancellationToken
// TODO(jtattermusch): add deadline info
diff --git a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
index f915155f8a..a4f8989b30 100644
--- a/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
+++ b/src/csharp/Grpc.Core/Utils/AsyncStreamExtensions.cs
@@ -49,14 +49,9 @@ namespace Grpc.Core.Utils
public static async Task ForEach<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
where T : class
{
- while (true)
+ while (await streamReader.MoveNext())
{
- var elem = await streamReader.ReadNext();
- if (elem == null)
- {
- break;
- }
- await asyncAction(elem);
+ await asyncAction(streamReader.Current);
}
}
@@ -67,32 +62,27 @@ namespace Grpc.Core.Utils
where T : class
{
var result = new List<T>();
- while (true)
+ while (await streamReader.MoveNext())
{
- var elem = await streamReader.ReadNext();
- if (elem == null)
- {
- break;
- }
- result.Add(elem);
+ result.Add(streamReader.Current);
}
return result;
}
/// <summary>
/// Writes all elements from given enumerable to the stream.
- /// Closes the stream afterwards unless close = false.
+ /// Completes the stream afterwards unless close = false.
/// </summary>
- public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool close = true)
+ public static async Task WriteAll<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
where T : class
{
foreach (var element in elements)
{
await streamWriter.Write(element);
}
- if (close)
+ if (complete)
{
- await streamWriter.Close();
+ await streamWriter.Complete();
}
}
diff --git a/src/csharp/Grpc.Core/packages.config b/src/csharp/Grpc.Core/packages.config
index 71967de56e..fb7eaaeeda 100644
--- a/src/csharp/Grpc.Core/packages.config
+++ b/src/csharp/Grpc.Core/packages.config
@@ -2,5 +2,6 @@
<packages>
<package id="grpc.dependencies.openssl.redist" version="1.0.2.2" targetFramework="net45" />
<package id="grpc.dependencies.zlib.redist" version="1.2.8.9" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl.Immutable" version="1.0.34" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
index 87ccf07dd8..6e84add42b 100644
--- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
+++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
@@ -37,6 +37,10 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async, Version=1.2.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
diff --git a/src/csharp/Grpc.Examples.Tests/packages.config b/src/csharp/Grpc.Examples.Tests/packages.config
index 4d6ec63b3c..cc6e9af40f 100644
--- a/src/csharp/Grpc.Examples.Tests/packages.config
+++ b/src/csharp/Grpc.Examples.Tests/packages.config
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj
index 2c5019c214..5ce490f403 100644
--- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj
+++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj
@@ -35,6 +35,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
diff --git a/src/csharp/Grpc.Examples/packages.config b/src/csharp/Grpc.Examples/packages.config
index 51c17bcd5e..4c8d60fa62 100644
--- a/src/csharp/Grpc.Examples/packages.config
+++ b/src/csharp/Grpc.Examples/packages.config
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="NUnit" version="2.6.4" targetFramework="net45" />
</packages> \ No newline at end of file
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 1ca3dd24e1..b3a0a2917b 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -54,6 +54,9 @@
<Reference Include="Google.ProtocolBuffers">
<HintPath>..\packages\Google.ProtocolBuffers.2.4.1.521\lib\net40\Google.ProtocolBuffers.dll</HintPath>
</Reference>
+ <Reference Include="System.Interactive.Async">
+ <HintPath>..\packages\Ix-Async.1.2.3\lib\net45\System.Interactive.Async.dll</HintPath>
+ </Reference>
<Reference Include="System.Net" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Net.Http.Extensions">
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index 02f8a369de..d907699698 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -256,48 +256,45 @@ namespace Grpc.IntegrationTesting
var call = client.FullDuplexCall();
- StreamingOutputCallResponse response;
-
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(31415, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(9))
.SetPayload(CreateZerosPayload(8)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(9, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(9, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(2653))
.SetPayload(CreateZerosPayload(1828)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(2653, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(2653, call.ResponseStream.Current.Payload.Body.Length);
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(58979))
.SetPayload(CreateZerosPayload(45904)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(58979, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(58979, call.ResponseStream.Current.Payload.Body.Length);
- await call.RequestStream.Close();
+ await call.RequestStream.Complete();
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(null, response);
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
Console.WriteLine("Passed!");
}).Wait();
@@ -309,7 +306,7 @@ namespace Grpc.IntegrationTesting
{
Console.WriteLine("running empty_stream");
var call = client.FullDuplexCall();
- await call.Close();
+ await call.RequestStream.Complete();
var responseList = await call.ResponseStream.ToList();
Assert.AreEqual(0, responseList.Count);
@@ -392,22 +389,20 @@ namespace Grpc.IntegrationTesting
var cts = new CancellationTokenSource();
var call = client.FullDuplexCall(cts.Token);
- StreamingOutputCallResponse response;
-
await call.RequestStream.Write(StreamingOutputCallRequest.CreateBuilder()
.SetResponseType(PayloadType.COMPRESSABLE)
.AddResponseParameters(ResponseParameters.CreateBuilder().SetSize(31415))
.SetPayload(CreateZerosPayload(27182)).Build());
- response = await call.ResponseStream.ReadNext();
- Assert.AreEqual(PayloadType.COMPRESSABLE, response.Payload.Type);
- Assert.AreEqual(31415, response.Payload.Body.Length);
+ Assert.IsTrue(await call.ResponseStream.MoveNext());
+ Assert.AreEqual(PayloadType.COMPRESSABLE, call.ResponseStream.Current.Payload.Type);
+ Assert.AreEqual(31415, call.ResponseStream.Current.Payload.Body.Length);
cts.Cancel();
try
{
- response = await call.ResponseStream.ReadNext();
+ await call.ResponseStream.MoveNext();
Assert.Fail();
}
catch (RpcException e)
diff --git a/src/csharp/Grpc.IntegrationTesting/packages.config b/src/csharp/Grpc.IntegrationTesting/packages.config
index e33b6e3e46..291b7b8599 100644
--- a/src/csharp/Grpc.IntegrationTesting/packages.config
+++ b/src/csharp/Grpc.IntegrationTesting/packages.config
@@ -3,6 +3,7 @@
<package id="Google.Apis.Auth" version="1.9.1" targetFramework="net45" />
<package id="Google.Apis.Core" version="1.9.1" targetFramework="net45" />
<package id="Google.ProtocolBuffers" version="2.4.1.521" targetFramework="net45" />
+ <package id="Ix-Async" version="1.2.3" targetFramework="net45" />
<package id="Microsoft.Bcl" version="1.1.9" targetFramework="net45" />
<package id="Microsoft.Bcl.Async" version="1.0.168" targetFramework="net45" />
<package id="Microsoft.Bcl.Build" version="1.0.14" targetFramework="net45" />
diff --git a/src/ruby/bin/apis/pubsub_demo.rb b/src/ruby/bin/apis/pubsub_demo.rb
index 6d69b0f21e..a039d036ac 100755
--- a/src/ruby/bin/apis/pubsub_demo.rb
+++ b/src/ruby/bin/apis/pubsub_demo.rb
@@ -79,7 +79,7 @@ end
def publisher_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::PublisherService::Stub # shorter
- logger.info("... access PublisherService at #{address}")
+ GRPC.logger.info("... access PublisherService at #{address}")
stub_clz.new(address,
creds: ssl_creds, update_metadata: auth_proc(opts),
GRPC::Core::Channel::SSL_TARGET => opts.host)
@@ -89,7 +89,7 @@ end
def subscriber_stub(opts)
address = "#{opts.host}:#{opts.port}"
stub_clz = Tech::Pubsub::SubscriberService::Stub # shorter
- logger.info("... access SubscriberService at #{address}")
+ GRPC.logger.info("... access SubscriberService at #{address}")
stub_clz.new(address,
creds: ssl_creds, update_metadata: auth_proc(opts),
GRPC::Core::Channel::SSL_TARGET => opts.host)
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index a388924722..8df03ffb3c 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -70,7 +70,7 @@ end
# loads the certificates used to access the test server securely.
def load_prod_cert
fail 'could not find a production cert' if ENV['SSL_CERT_FILE'].nil?
- logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}")
+ GRPC.logger.info("loading prod certs from #{ENV['SSL_CERT_FILE']}")
File.open(ENV['SSL_CERT_FILE']).read
end
@@ -115,10 +115,10 @@ def create_stub(opts)
stub_opts[:update_metadata] = auth_creds.updater_proc
end
- logger.info("... connecting securely to #{address}")
+ GRPC.logger.info("... connecting securely to #{address}")
Grpc::Testing::TestService::Stub.new(address, **stub_opts)
else
- logger.info("... connecting insecurely to #{address}")
+ GRPC.logger.info("... connecting insecurely to #{address}")
Grpc::Testing::TestService::Stub.new(address)
end
end
diff --git a/src/ruby/bin/interop/interop_server.rb b/src/ruby/bin/interop/interop_server.rb
index 72570d92f3..78cb8dd836 100755
--- a/src/ruby/bin/interop/interop_server.rb
+++ b/src/ruby/bin/interop/interop_server.rb
@@ -129,13 +129,13 @@ class TestTarget < Grpc::Testing::TestService::Service
Thread.new do
begin
reqs.each do |req|
- logger.info("read #{req.inspect}")
+ GRPC.logger.info("read #{req.inspect}")
resp_size = req.response_parameters[0].size
resp = cls.new(payload: Payload.new(type: req.response_type,
body: nulls(resp_size)))
q.push(resp)
end
- logger.info('finished reads')
+ GRPC.logger.info('finished reads')
q.push(self)
rescue StandardError => e
q.push(e) # share the exception with the enumerator
@@ -179,10 +179,10 @@ def main
s = GRPC::RpcServer.new
if opts['secure']
s.add_http2_port(host, test_server_creds)
- logger.info("... running securely on #{host}")
+ GRPC.logger.info("... running securely on #{host}")
else
s.add_http2_port(host)
- logger.info("... running insecurely on #{host}")
+ GRPC.logger.info("... running insecurely on #{host}")
end
s.handle(TestTarget)
s.run_till_terminated
diff --git a/src/ruby/bin/math_client.rb b/src/ruby/bin/math_client.rb
index db254efb00..6319cda309 100755
--- a/src/ruby/bin/math_client.rb
+++ b/src/ruby/bin/math_client.rb
@@ -46,51 +46,51 @@ require 'optparse'
include GRPC::Core::TimeConsts
def do_div(stub)
- logger.info('request_response')
- logger.info('----------------')
+ GRPC.logger.info('request_response')
+ GRPC.logger.info('----------------')
req = Math::DivArgs.new(dividend: 7, divisor: 3)
- logger.info("div(7/3): req=#{req.inspect}")
+ GRPC.logger.info("div(7/3): req=#{req.inspect}")
resp = stub.div(req, INFINITE_FUTURE)
- logger.info("Answer: #{resp.inspect}")
- logger.info('----------------')
+ GRPC.logger.info("Answer: #{resp.inspect}")
+ GRPC.logger.info('----------------')
end
def do_sum(stub)
# to make client streaming requests, pass an enumerable of the inputs
- logger.info('client_streamer')
- logger.info('---------------')
+ GRPC.logger.info('client_streamer')
+ GRPC.logger.info('---------------')
reqs = [1, 2, 3, 4, 5].map { |x| Math::Num.new(num: x) }
- logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}")
+ GRPC.logger.info("sum(1, 2, 3, 4, 5): reqs=#{reqs.inspect}")
resp = stub.sum(reqs) # reqs.is_a?(Enumerable)
- logger.info("Answer: #{resp.inspect}")
- logger.info('---------------')
+ GRPC.logger.info("Answer: #{resp.inspect}")
+ GRPC.logger.info('---------------')
end
def do_fib(stub)
- logger.info('server_streamer')
- logger.info('----------------')
+ GRPC.logger.info('server_streamer')
+ GRPC.logger.info('----------------')
req = Math::FibArgs.new(limit: 11)
- logger.info("fib(11): req=#{req.inspect}")
+ GRPC.logger.info("fib(11): req=#{req.inspect}")
resp = stub.fib(req, INFINITE_FUTURE)
resp.each do |r|
- logger.info("Answer: #{r.inspect}")
+ GRPC.logger.info("Answer: #{r.inspect}")
end
- logger.info('----------------')
+ GRPC.logger.info('----------------')
end
def do_div_many(stub)
- logger.info('bidi_streamer')
- logger.info('-------------')
+ GRPC.logger.info('bidi_streamer')
+ GRPC.logger.info('-------------')
reqs = []
reqs << Math::DivArgs.new(dividend: 7, divisor: 3)
reqs << Math::DivArgs.new(dividend: 5, divisor: 2)
reqs << Math::DivArgs.new(dividend: 7, divisor: 2)
- logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
+ GRPC.logger.info("div(7/3), div(5/2), div(7/2): reqs=#{reqs.inspect}")
resp = stub.div_many(reqs, 10)
resp.each do |r|
- logger.info("Answer: #{r.inspect}")
+ GRPC.logger.info("Answer: #{r.inspect}")
end
- logger.info('----------------')
+ GRPC.logger.info('----------------')
end
def load_test_certs
@@ -132,10 +132,10 @@ def main
p stub_opts
p options['host']
stub = Math::Math::Stub.new(options['host'], **stub_opts)
- logger.info("... connecting securely on #{options['host']}")
+ GRPC.logger.info("... connecting securely on #{options['host']}")
else
stub = Math::Math::Stub.new(options['host'])
- logger.info("... connecting insecurely on #{options['host']}")
+ GRPC.logger.info("... connecting insecurely on #{options['host']}")
end
do_div(stub)
diff --git a/src/ruby/bin/math_server.rb b/src/ruby/bin/math_server.rb
index e46d9c671f..b41ccf6ce1 100755
--- a/src/ruby/bin/math_server.rb
+++ b/src/ruby/bin/math_server.rb
@@ -128,13 +128,13 @@ class Calculator < Math::Math::Service
t = Thread.new do
begin
requests.each do |req|
- logger.info("read #{req.inspect}")
+ GRPC.logger.info("read #{req.inspect}")
resp = Math::DivReply.new(quotient: req.dividend / req.divisor,
remainder: req.dividend % req.divisor)
q.push(resp)
Thread.pass # let the internal Bidi threads run
end
- logger.info('finished reads')
+ GRPC.logger.info('finished reads')
q.push(self)
rescue StandardError => e
q.push(e) # share the exception with the enumerator
@@ -176,10 +176,10 @@ def main
s = GRPC::RpcServer.new
if options['secure']
s.add_http2_port(options['host'], test_server_creds)
- logger.info("... running securely on #{options['host']}")
+ GRPC.logger.info("... running securely on #{options['host']}")
else
s.add_http2_port(options['host'])
- logger.info("... running insecurely on #{options['host']}")
+ GRPC.logger.info("... running insecurely on #{options['host']}")
end
s.handle(Calculator)
diff --git a/src/ruby/bin/noproto_client.rb b/src/ruby/bin/noproto_client.rb
index f3fd110347..390a9c59c3 100755
--- a/src/ruby/bin/noproto_client.rb
+++ b/src/ruby/bin/noproto_client.rb
@@ -94,15 +94,15 @@ def main
p stub_opts
p options['host']
stub = NoProtoStub.new(options['host'], **stub_opts)
- logger.info("... connecting securely on #{options['host']}")
+ GRPC.logger.info("... connecting securely on #{options['host']}")
else
stub = NoProtoStub.new(options['host'])
- logger.info("... connecting insecurely on #{options['host']}")
+ GRPC.logger.info("... connecting insecurely on #{options['host']}")
end
- logger.info('sending a NoProto rpc')
+ GRPC.logger.info('sending a NoProto rpc')
resp = stub.an_rpc(NoProtoMsg.new)
- logger.info("got a response: #{resp}")
+ GRPC.logger.info("got a response: #{resp}")
end
main
diff --git a/src/ruby/bin/noproto_server.rb b/src/ruby/bin/noproto_server.rb
index f71daeadb3..90baaf9a2e 100755
--- a/src/ruby/bin/noproto_server.rb
+++ b/src/ruby/bin/noproto_server.rb
@@ -63,7 +63,7 @@ class NoProto < NoProtoService
end
def an_rpc(req, _call)
- logger.info('echo service received a request')
+ GRPC.logger.info('echo service received a request')
req
end
end
@@ -98,10 +98,10 @@ def main
s = GRPC::RpcServer.new
if options['secure']
s.add_http2_port(options['host'], test_server_creds)
- logger.info("... running securely on #{options['host']}")
+ GRPC.logger.info("... running securely on #{options['host']}")
else
s.add_http2_port(options['host'])
- logger.info("... running insecurely on #{options['host']}")
+ GRPC.logger.info("... running insecurely on #{options['host']}")
end
s.handle(NoProto)
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 947c39cd22..5f7beb5ab1 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -188,7 +188,7 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
- logger.debug("sending #{req}, marshalled? #{marshalled}")
+ GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled
payload = req
else
@@ -230,14 +230,14 @@ module GRPC
@call.metadata = batch_result.metadata
@metadata_tag = nil
end
- logger.debug("received req: #{batch_result}")
+ GRPC.logger.debug("received req: #{batch_result}")
unless batch_result.nil? || batch_result.message.nil?
- logger.debug("received req.to_s: #{batch_result.message}")
+ GRPC.logger.debug("received req.to_s: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
- logger.debug("received_req (unmarshalled): #{res.inspect}")
+ GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}")
return res
end
- logger.debug('found nil; the final response has been sent')
+ GRPC.logger.debug('found nil; the final response has been sent')
nil
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 4ca3004d6f..67143d40cf 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -115,10 +115,10 @@ module GRPC
return enum_for(:each_queued_msg) unless block_given?
count = 0
loop do
- logger.debug("each_queued_msg: msg##{count}")
+ GRPC.logger.debug("each_queued_msg: msg##{count}")
count += 1
req = @readq.pop
- logger.debug("each_queued_msg: req = #{req}")
+ GRPC.logger.debug("each_queued_msg: req = #{req}")
throw req if req.is_a? StandardError
break if req.equal?(END_OF_READS)
yield req
@@ -134,22 +134,22 @@ module GRPC
begin
count = 0
requests.each do |req|
- logger.debug("bidi-write_loop: #{count}")
+ GRPC.logger.debug("bidi-write_loop: #{count}")
count += 1
payload = @marshal.call(req)
@call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_MESSAGE => payload)
end
if is_client
- logger.debug("bidi-write-loop: sent #{count}, waiting to finish")
+ GRPC.logger.debug("bidi-write-loop: sent #{count}, waiting")
batch_result = @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
SEND_CLOSE_FROM_CLIENT => nil,
RECV_STATUS_ON_CLIENT => nil)
batch_result.check_status
end
rescue StandardError => e
- logger.warn('bidi-write_loop: failed')
- logger.warn(e)
+ GRPC.logger.warn('bidi-write_loop: failed')
+ GRPC.logger.warn(e)
raise e
end
end
@@ -164,7 +164,7 @@ module GRPC
# queue the initial read before beginning the loop
loop do
- logger.debug("bidi-read_loop: #{count}")
+ GRPC.logger.debug("bidi-read_loop: #{count}")
count += 1
# TODO: ensure metadata is read if available, currently it's not
batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
@@ -172,19 +172,19 @@ module GRPC
# handle the next message
if batch_result.message.nil?
@readq.push(END_OF_READS)
- logger.debug('bidi-read-loop: done reading!')
+ GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
# push the latest read onto the queue and continue reading
- logger.debug("received req: #{batch_result.message}")
+ GRPC.logger.debug("received req: #{batch_result.message}")
res = @unmarshal.call(batch_result.message)
@readq.push(res)
end
rescue StandardError => e
- logger.warn('bidi: read_loop failed')
- logger.warn(e)
+ GRPC.logger.warn('bidi: read_loop failed')
+ GRPC.logger.warn(e)
@readq.push(e) # let each_queued_msg terminate with this error
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 10211ae239..2fd61c5f7e 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -84,22 +84,22 @@ module GRPC
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error
# code and detail message and some additional app-specific metadata.
- logger.debug("app err: #{active_call}, status:#{e.code}:#{e.details}")
+ GRPC.logger.debug("app err:#{active_call}, status:#{e.code}:#{e.details}")
send_status(active_call, e.code, e.details, **e.metadata)
rescue Core::CallError => e
# This is raised by GRPC internals but should rarely, if ever happen.
# Log it, but don't notify the other endpoint..
- logger.warn("failed call: #{active_call}\n#{e}")
+ GRPC.logger.warn("failed call: #{active_call}\n#{e}")
rescue Core::OutOfTime
# This is raised when active_call#method.call exceeeds the deadline
# event. Send a status of deadline exceeded
- logger.warn("late call: #{active_call}")
+ GRPC.logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
rescue StandardError => e
# This will usuaally be an unhandled error in the handling code.
# Send back a UNKNOWN status to the client
- logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
- logger.warn(e)
+ GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
+ GRPC.logger.warn(e)
send_status(active_call, UNKNOWN, 'no reason given')
end
@@ -139,8 +139,8 @@ module GRPC
details = 'Not sure why' if details.nil?
active_client.send_status(code, details, code == OK, **kw)
rescue StandardError => e
- logger.warn("Could not send status #{code}:#{details}")
- logger.warn(e)
+ GRPC.logger.warn("Could not send status #{code}:#{details}")
+ GRPC.logger.warn(e)
end
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index de22466089..665c144432 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -94,7 +94,7 @@ module GRPC
def schedule(*args, &blk)
fail 'already stopped' if @stopped
return if blk.nil?
- logger.info('schedule another job')
+ GRPC.logger.info('schedule another job')
@jobs << [blk, args]
end
@@ -114,14 +114,14 @@ module GRPC
# Stops the jobs in the pool
def stop
- logger.info('stopping, will wait for all the workers to exit')
+ GRPC.logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
@stopped = true
@stop_mutex.synchronize do # wait @keep_alive for works to stop
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
- logger.info('stopped, all workers are shutdown')
+ GRPC.logger.info('stopped, all workers are shutdown')
end
protected
@@ -129,14 +129,14 @@ module GRPC
# Forcibly shutdown any threads that are still alive.
def forcibly_stop_workers
return unless @workers.size > 0
- logger.info("forcibly terminating #{@workers.size} worker(s)")
+ GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
- logger.warn('error while terminating a worker')
- logger.warn(e)
+ GRPC.logger.warn('error while terminating a worker')
+ GRPC.logger.warn(e)
end
end
end
@@ -156,8 +156,8 @@ module GRPC
blk, args = @jobs.pop
blk.call(*args)
rescue StandardError => e
- logger.warn('Error in worker thread')
- logger.warn(e)
+ GRPC.logger.warn('Error in worker thread')
+ GRPC.logger.warn(e)
end
end
end
@@ -365,7 +365,7 @@ module GRPC
# the server to stop.
def run
if rpc_descs.size.zero?
- logger.warn('did not run as no services were present')
+ GRPC.logger.warn('did not run as no services were present')
return
end
@run_mutex.synchronize do
@@ -381,9 +381,9 @@ module GRPC
# Sends UNAVAILABLE if there are too many unprocessed jobs
def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
- logger.info("waiting: #{jobs_count}, max: #{max}")
+ GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
- logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
+ GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::UNAVAILABLE, '')
@@ -394,7 +394,7 @@ module GRPC
def found?(an_rpc)
mth = an_rpc.method.to_sym
return an_rpc if rpc_descs.key?(mth)
- logger.warn("NOT_FOUND: #{an_rpc}")
+ GRPC.logger.warn("NOT_FOUND: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
c.send_status(StatusCodes::NOT_FOUND, '')
@@ -434,7 +434,7 @@ module GRPC
return nil unless found?(an_rpc)
# Create the ActiveCall
- logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
+ GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
rpc_desc = rpc_descs[an_rpc.method.to_sym]
ActiveCall.new(an_rpc.call, @cq,
rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
@@ -474,7 +474,7 @@ module GRPC
else
handlers[route] = service.method(rpc_name)
end
- logger.info("handling #{route} with #{handlers[route]}")
+ GRPC.logger.info("handling #{route} with #{handlers[route]}")
end
end
end
diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb
index 8ea2c82f17..3b9743ea66 100644
--- a/src/ruby/lib/grpc/generic/service.rb
+++ b/src/ruby/lib/grpc/generic/service.rb
@@ -175,23 +175,23 @@ module GRPC
route = "/#{route_prefix}/#{name}"
if desc.request_response?
define_method(mth_name) do |req, deadline = nil, **kw|
- logger.debug("calling #{@host}:#{route}")
+ GRPC.logger.debug("calling #{@host}:#{route}")
request_response(route, req, marshal, unmarshal, deadline, **kw)
end
elsif desc.client_streamer?
define_method(mth_name) do |reqs, deadline = nil, **kw|
- logger.debug("calling #{@host}:#{route}")
+ GRPC.logger.debug("calling #{@host}:#{route}")
client_streamer(route, reqs, marshal, unmarshal, deadline, **kw)
end
elsif desc.server_streamer?
define_method(mth_name) do |req, deadline = nil, **kw, &blk|
- logger.debug("calling #{@host}:#{route}")
+ GRPC.logger.debug("calling #{@host}:#{route}")
server_streamer(route, req, marshal, unmarshal, deadline, **kw,
&blk)
end
else # is a bidi_stream
define_method(mth_name) do |reqs, deadline = nil, **kw, &blk|
- logger.debug("calling #{@host}:#{route}")
+ GRPC.logger.debug("calling #{@host}:#{route}")
bidi_streamer(route, reqs, marshal, unmarshal, deadline, **kw,
&blk)
end
diff --git a/src/ruby/lib/grpc/logconfig.rb b/src/ruby/lib/grpc/logconfig.rb
index f36906fc45..96812170ba 100644
--- a/src/ruby/lib/grpc/logconfig.rb
+++ b/src/ruby/lib/grpc/logconfig.rb
@@ -29,7 +29,10 @@
require 'logging'
-include Logging.globally # logger is accessible everywhere
+# GRPC contains the General RPC module.
+module GRPC
+ extend Logging.globally
+end
Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = :info
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 2cd21a15e3..640b0f656c 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -69,7 +69,7 @@ class EchoService
end
def an_rpc(req, call)
- logger.info('echo service received a request')
+ GRPC.logger.info('echo service received a request')
call.output_metadata.update(@trailing_metadata)
@received_md << call.metadata unless call.metadata.nil?
req
@@ -109,7 +109,7 @@ class SlowService
end
def an_rpc(req, call)
- logger.info("starting a slow #{@delay} rpc")
+ GRPC.logger.info("starting a slow #{@delay} rpc")
sleep @delay
@received_md << call.metadata unless call.metadata.nil?
req # send back the req as the response