aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs3
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs10
-rw-r--r--src/csharp/Grpc.Core/Server.cs176
-rw-r--r--src/csharp/Grpc.Examples.MathServer/MathServer.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj4
-rw-r--r--src/ruby/ext/grpc/rb_call.c6
-rwxr-xr-xsrc/ruby/grpc.gemspec2
-rw-r--r--src/ruby/lib/grpc.rb1
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb2
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb3
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb27
-rw-r--r--src/ruby/lib/grpc/notifier.rb60
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb31
13 files changed, 189 insertions, 138 deletions
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 14add60c72..c97a3bc2b1 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -33,6 +33,7 @@ using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using Grpc.Core;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
@@ -180,7 +181,7 @@ namespace Grpc.Core.Internal
private static void AssertCallOk(GRPCCallError callError)
{
- Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
+ Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
}
private static uint GetFlags(bool buffered)
diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
index a59da09822..8080643d8c 100644
--- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs
@@ -35,6 +35,7 @@ using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
+using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
@@ -105,9 +106,9 @@ namespace Grpc.Core.Internal
grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback);
}
- public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
+ public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback)
{
- return grpcsharp_server_request_call(this, cq, callback);
+ AssertCallOk(grpcsharp_server_request_call(this, cq, callback));
}
protected override bool ReleaseHandle()
@@ -115,5 +116,10 @@ namespace Grpc.Core.Internal
grpcsharp_server_destroy(handle);
return true;
}
+
+ private static void AssertCallOk(GRPCCallError callError)
+ {
+ Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index f086fa8beb..e686cdddef 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -38,27 +38,29 @@ using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Grpc.Core.Internal;
+using Grpc.Core.Utils;
namespace Grpc.Core
{
/// <summary>
- /// Server is implemented only to be able to do
- /// in-process testing.
+ /// A gRPC server.
/// </summary>
public class Server
{
- // TODO: make sure the delegate doesn't get garbage collected while
+ // TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while
// native callbacks are in the completion queue.
readonly ServerShutdownCallbackDelegate serverShutdownHandler;
readonly CompletionCallbackDelegate newServerRpcHandler;
- readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>();
readonly ServerSafeHandle handle;
+ readonly object myLock = new object();
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
-
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
+ bool startRequested;
+ bool shutdownRequested;
+
public Server()
{
this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero);
@@ -66,71 +68,81 @@ namespace Grpc.Core
this.serverShutdownHandler = HandleServerShutdown;
}
- // only call this before Start()
+ /// <summary>
+ /// Adds a service definition to the server. This is how you register
+ /// handlers for a service with the server.
+ /// Only call this before Start().
+ /// </summary>
public void AddServiceDefinition(ServerServiceDefinition serviceDefinition)
{
- foreach (var entry in serviceDefinition.CallHandlers)
+ lock (myLock)
{
- callHandlers.Add(entry.Key, entry.Value);
+ Preconditions.CheckState(!startRequested);
+ foreach (var entry in serviceDefinition.CallHandlers)
+ {
+ callHandlers.Add(entry.Key, entry.Value);
+ }
}
}
- // only call before Start()
+ /// <summary>
+ /// Add a non-secure port on which server should listen.
+ /// Only call this before Start().
+ /// </summary>
public int AddListeningPort(string addr)
{
- return handle.AddListeningPort(addr);
- }
-
- // only call before Start()
- public int AddListeningPort(string addr, ServerCredentials credentials)
- {
- using (var nativeCredentials = credentials.ToNativeCredentials())
+ lock (myLock)
{
- return handle.AddListeningPort(addr, nativeCredentials);
+ Preconditions.CheckState(!startRequested);
+ return handle.AddListeningPort(addr);
}
}
- public void Start()
- {
- handle.Start();
-
- // TODO: this basically means the server is single threaded....
- StartHandlingRpcs();
- }
-
/// <summary>
- /// Requests and handles single RPC call.
+ /// Add a secure port on which server should listen.
+ /// Only call this before Start().
/// </summary>
- internal void RunRpc()
+ public int AddListeningPort(string addr, ServerCredentials credentials)
{
- AllowOneRpc();
-
- try
+ lock (myLock)
{
- var rpcInfo = newRpcQueue.Take();
-
- // Console.WriteLine("Server received RPC " + rpcInfo.Method);
-
- IServerCallHandler callHandler;
- if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler))
+ Preconditions.CheckState(!startRequested);
+ using (var nativeCredentials = credentials.ToNativeCredentials())
{
- callHandler = new NoSuchMethodCallHandler();
+ return handle.AddListeningPort(addr, nativeCredentials);
}
- callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue());
}
- catch (Exception e)
+ }
+
+ /// <summary>
+ /// Starts the server.
+ /// </summary>
+ public void Start()
+ {
+ lock (myLock)
{
- Console.WriteLine("Exception while handling RPC: " + e);
+ Preconditions.CheckState(!startRequested);
+ startRequested = true;
+
+ handle.Start();
+ AllowOneRpc();
}
}
/// <summary>
/// Requests server shutdown and when there are no more calls being serviced,
- /// cleans up used resources.
+ /// cleans up used resources. The returned task finishes when shutdown procedure
+ /// is complete.
/// </summary>
- /// <returns>The async.</returns>
public async Task ShutdownAsync()
{
+ lock (myLock)
+ {
+ Preconditions.CheckState(startRequested);
+ Preconditions.CheckState(!shutdownRequested);
+ shutdownRequested = true;
+ }
+
handle.ShutdownAndNotify(serverShutdownHandler);
await shutdownTcs.Task;
handle.Dispose();
@@ -152,19 +164,43 @@ namespace Grpc.Core
handle.Dispose();
}
- private async Task StartHandlingRpcs()
+ /// <summary>
+ /// Allows one new RPC call to be received by server.
+ /// </summary>
+ private void AllowOneRpc()
{
- while (true)
+ lock (myLock)
{
- await Task.Factory.StartNew(RunRpc);
+ if (!shutdownRequested)
+ {
+ handle.RequestCall(GetCompletionQueue(), newServerRpcHandler);
+ }
}
}
- private void AllowOneRpc()
+ /// <summary>
+ /// Selects corresponding handler for given call and handles the call.
+ /// </summary>
+ private void InvokeCallHandler(CallSafeHandle call, string method)
{
- AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler));
+ try
+ {
+ IServerCallHandler callHandler;
+ if (!callHandlers.TryGetValue(method, out callHandler))
+ {
+ callHandler = new NoSuchMethodCallHandler();
+ }
+ callHandler.StartCall(method, call, GetCompletionQueue());
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Exception while handling RPC: " + e);
+ }
}
+ /// <summary>
+ /// Handles the native callback.
+ /// </summary>
private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr)
{
try
@@ -176,13 +212,16 @@ namespace Grpc.Core
// TODO: handle error
}
- var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod());
+ CallSafeHandle call = ctx.GetServerRpcNewCall();
+ string method = ctx.GetServerRpcNewMethod();
// after server shutdown, the callback returns with null call
- if (!rpcInfo.Call.IsInvalid)
+ if (!call.IsInvalid)
{
- newRpcQueue.Add(rpcInfo);
+ Task.Run(() => InvokeCallHandler(call, method));
}
+
+ AllowOneRpc();
}
catch (Exception e)
{
@@ -190,6 +229,10 @@ namespace Grpc.Core
}
}
+ /// <summary>
+ /// Handles native callback.
+ /// </summary>
+ /// <param name="eventPtr"></param>
private void HandleServerShutdown(IntPtr eventPtr)
{
try
@@ -202,42 +245,9 @@ namespace Grpc.Core
}
}
- private static void AssertCallOk(GRPCCallError callError)
- {
- Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK");
- }
-
private static CompletionQueueSafeHandle GetCompletionQueue()
{
return GrpcEnvironment.ThreadPool.CompletionQueue;
}
-
- private struct NewRpcInfo
- {
- private CallSafeHandle call;
- private string method;
-
- public NewRpcInfo(CallSafeHandle call, string method)
- {
- this.call = call;
- this.method = method;
- }
-
- public CallSafeHandle Call
- {
- get
- {
- return this.call;
- }
- }
-
- public string Method
- {
- get
- {
- return this.method;
- }
- }
- }
}
}
diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
index f7429fb43f..abc7ef05e4 100644
--- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs
+++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs
@@ -40,7 +40,7 @@ namespace math
{
public static void Main(string[] args)
{
- String host = "0.0.0.0";
+ string host = "0.0.0.0";
GrpcEnvironment.Initialize();
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 584bf1068d..6ae8041fb7 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -80,5 +80,7 @@
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
- <ItemGroup />
+ <ItemGroup>
+ <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
+ </ItemGroup>
</Project> \ No newline at end of file
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 6da7d3c830..f4ae6fab84 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -607,19 +607,19 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
rb_raise(grpc_rb_eCallError,
"grpc_call_start_batch failed with %s (code=%d)",
grpc_call_error_detail_of(err), err);
- return;
+ return Qnil;
}
ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
if (ev == NULL) {
grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
- return;
+ return Qnil;
}
if (ev->data.op_complete != GRPC_OP_OK) {
grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)",
ev->data.op_complete);
- return;
+ return Qnil;
}
/* Build and return the BatchResult struct result */
diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec
index c633579102..19b3e21cb6 100755
--- a/src/ruby/grpc.gemspec
+++ b/src/ruby/grpc.gemspec
@@ -26,7 +26,7 @@ Gem::Specification.new do |s|
s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1'
s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests
- s.add_dependency 'logging', '~> 1.8'
+ s.add_dependency 'logging', '~> 2.0'
s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests
s.add_development_dependency 'simplecov', '~> 0.9'
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb
index b0f68035cd..80b5743e91 100644
--- a/src/ruby/lib/grpc.rb
+++ b/src/ruby/lib/grpc.rb
@@ -30,6 +30,7 @@
require 'grpc/errors'
require 'grpc/grpc'
require 'grpc/logconfig'
+require 'grpc/notifier'
require 'grpc/version'
require 'grpc/core/time_consts'
require 'grpc/generic/active_call'
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 43ba549905..947c39cd22 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -188,7 +188,7 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
- logger.debug("sending #{req.inspect}, marshalled? #{marshalled}")
+ logger.debug("sending #{req}, marshalled? #{marshalled}")
if marshalled
payload = req
else
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index b813ab5b54..4ca3004d6f 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -123,8 +123,7 @@ module GRPC
break if req.equal?(END_OF_READS)
yield req
end
- @loop_th.join
- @enq_th.join
+ @enq_th.join if @enq_th.alive?
end
# during bidi-streaming, read the requests to send from a separate thread
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 88c24aa92b..3375fcf20a 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -54,6 +54,18 @@ module GRPC
end
module_function :handle_signals
+ # Sets up a signal handler that adds signals to the signal handling global.
+ #
+ # Signal handlers should do as little as humanly possible.
+ # Here, they just add themselves to $grpc_signals
+ #
+ # RpcServer (and later other parts of gRPC) monitors the signals
+ # $grpc_signals in its own non-signal context.
+ def trap_signals
+ %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
+ end
+ module_function :trap_signals
+
# Pool is a simple thread pool.
class Pool
# Default keep alive period is 1s
@@ -172,17 +184,6 @@ module GRPC
# Signal check period is 0.25s
SIGNAL_CHECK_PERIOD = 0.25
- # Sets up a signal handler that adds signals to the signal handling global.
- #
- # Signal handlers should do as little as humanly possible.
- # Here, they just add themselves to $grpc_signals
- #
- # RpcServer (and later other parts of gRPC) monitors the signals
- # $grpc_signals in its own non-signal context.
- def self.trap_signals
- %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
- end
-
# setup_cq is used by #initialize to constuct a Core::CompletionQueue from
# its arguments.
def self.setup_cq(alt_cq)
@@ -299,12 +300,12 @@ module GRPC
# Runs the server in its own thread, then waits for signal INT or TERM on
# the current thread to terminate it.
def run_till_terminated
- self.class.trap_signals
+ GRPC.trap_signals
t = Thread.new { run }
wait_till_running
loop do
sleep SIGNAL_CHECK_PERIOD
- break unless handle_signals
+ break unless GRPC.handle_signals
end
stop
t.join
diff --git a/src/ruby/lib/grpc/notifier.rb b/src/ruby/lib/grpc/notifier.rb
new file mode 100644
index 0000000000..caa18bbed6
--- /dev/null
+++ b/src/ruby/lib/grpc/notifier.rb
@@ -0,0 +1,60 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# GRPC contains the General RPC module.
+module GRPC
+ # Notifier is useful high-level synchronization primitive.
+ class Notifier
+ attr_reader :payload, :notified
+ alias_method :notified?, :notified
+
+ def initialize
+ @mutex = Mutex.new
+ @cvar = ConditionVariable.new
+ @notified = false
+ @payload = nil
+ end
+
+ def wait
+ @mutex.synchronize do
+ @cvar.wait(@mutex) until notified?
+ end
+ end
+
+ def notify(payload)
+ @mutex.synchronize do
+ return Error.new('already notified') if notified?
+ @payload = payload
+ @notified = true
+ @cvar.signal
+ return nil
+ end
+ end
+ end
+end
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 88c6b44c22..98d68ccfbb 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -29,37 +29,8 @@
require 'grpc'
-# Notifier is useful high-level synchronization primitive.
-class Notifier
- attr_reader :payload, :notified
- alias_method :notified?, :notified
-
- def initialize
- @mutex = Mutex.new
- @cvar = ConditionVariable.new
- @notified = false
- @payload = nil
- end
-
- def wait
- @mutex.synchronize do
- @cvar.wait(@mutex) until notified?
- end
- end
-
- def notify(payload)
- @mutex.synchronize do
- return Error.new('already notified') if notified?
- @payload = payload
- @notified = true
- @cvar.signal
- return nil
- end
- end
-end
-
def wakey_thread(&blk)
- n = Notifier.new
+ n = GRPC::Notifier.new
t = Thread.new do
blk.call(n)
end