diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/csharp/Grpc.Core/Internal/CallSafeHandle.cs | 3 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs | 10 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Server.cs | 176 | ||||
-rw-r--r-- | src/csharp/Grpc.Examples.MathServer/MathServer.cs | 2 | ||||
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj | 4 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 6 | ||||
-rwxr-xr-x | src/ruby/grpc.gemspec | 2 | ||||
-rw-r--r-- | src/ruby/lib/grpc.rb | 1 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 2 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/bidi_call.rb | 3 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 27 | ||||
-rw-r--r-- | src/ruby/lib/grpc/notifier.rb | 60 | ||||
-rw-r--r-- | src/ruby/spec/generic/client_stub_spec.rb | 31 |
13 files changed, 189 insertions, 138 deletions
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs index 14add60c72..c97a3bc2b1 100644 --- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs @@ -33,6 +33,7 @@ using System; using System.Diagnostics; using System.Runtime.InteropServices; using Grpc.Core; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { @@ -180,7 +181,7 @@ namespace Grpc.Core.Internal private static void AssertCallOk(GRPCCallError callError) { - Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); } private static uint GetFlags(bool buffered) diff --git a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs index a59da09822..8080643d8c 100644 --- a/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs +++ b/src/csharp/Grpc.Core/Internal/ServerSafeHandle.cs @@ -35,6 +35,7 @@ using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.InteropServices; +using Grpc.Core.Utils; namespace Grpc.Core.Internal { @@ -105,9 +106,9 @@ namespace Grpc.Core.Internal grpcsharp_server_shutdown_and_notify_CALLBACK(this, callback); } - public GRPCCallError RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) + public void RequestCall(CompletionQueueSafeHandle cq, CompletionCallbackDelegate callback) { - return grpcsharp_server_request_call(this, cq, callback); + AssertCallOk(grpcsharp_server_request_call(this, cq, callback)); } protected override bool ReleaseHandle() @@ -115,5 +116,10 @@ namespace Grpc.Core.Internal grpcsharp_server_destroy(handle); return true; } + + private static void AssertCallOk(GRPCCallError callError) + { + Preconditions.CheckState(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); + } } } diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index f086fa8beb..e686cdddef 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -38,27 +38,29 @@ using System.Diagnostics; using System.Runtime.InteropServices; using System.Threading.Tasks; using Grpc.Core.Internal; +using Grpc.Core.Utils; namespace Grpc.Core { /// <summary> - /// Server is implemented only to be able to do - /// in-process testing. + /// A gRPC server. /// </summary> public class Server { - // TODO: make sure the delegate doesn't get garbage collected while + // TODO(jtattermusch) : make sure the delegate doesn't get garbage collected while // native callbacks are in the completion queue. readonly ServerShutdownCallbackDelegate serverShutdownHandler; readonly CompletionCallbackDelegate newServerRpcHandler; - readonly BlockingCollection<NewRpcInfo> newRpcQueue = new BlockingCollection<NewRpcInfo>(); readonly ServerSafeHandle handle; + readonly object myLock = new object(); readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>(); - readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>(); + bool startRequested; + bool shutdownRequested; + public Server() { this.handle = ServerSafeHandle.NewServer(GetCompletionQueue(), IntPtr.Zero); @@ -66,71 +68,81 @@ namespace Grpc.Core this.serverShutdownHandler = HandleServerShutdown; } - // only call this before Start() + /// <summary> + /// Adds a service definition to the server. This is how you register + /// handlers for a service with the server. + /// Only call this before Start(). + /// </summary> public void AddServiceDefinition(ServerServiceDefinition serviceDefinition) { - foreach (var entry in serviceDefinition.CallHandlers) + lock (myLock) { - callHandlers.Add(entry.Key, entry.Value); + Preconditions.CheckState(!startRequested); + foreach (var entry in serviceDefinition.CallHandlers) + { + callHandlers.Add(entry.Key, entry.Value); + } } } - // only call before Start() + /// <summary> + /// Add a non-secure port on which server should listen. + /// Only call this before Start(). + /// </summary> public int AddListeningPort(string addr) { - return handle.AddListeningPort(addr); - } - - // only call before Start() - public int AddListeningPort(string addr, ServerCredentials credentials) - { - using (var nativeCredentials = credentials.ToNativeCredentials()) + lock (myLock) { - return handle.AddListeningPort(addr, nativeCredentials); + Preconditions.CheckState(!startRequested); + return handle.AddListeningPort(addr); } } - public void Start() - { - handle.Start(); - - // TODO: this basically means the server is single threaded.... - StartHandlingRpcs(); - } - /// <summary> - /// Requests and handles single RPC call. + /// Add a secure port on which server should listen. + /// Only call this before Start(). /// </summary> - internal void RunRpc() + public int AddListeningPort(string addr, ServerCredentials credentials) { - AllowOneRpc(); - - try + lock (myLock) { - var rpcInfo = newRpcQueue.Take(); - - // Console.WriteLine("Server received RPC " + rpcInfo.Method); - - IServerCallHandler callHandler; - if (!callHandlers.TryGetValue(rpcInfo.Method, out callHandler)) + Preconditions.CheckState(!startRequested); + using (var nativeCredentials = credentials.ToNativeCredentials()) { - callHandler = new NoSuchMethodCallHandler(); + return handle.AddListeningPort(addr, nativeCredentials); } - callHandler.StartCall(rpcInfo.Method, rpcInfo.Call, GetCompletionQueue()); } - catch (Exception e) + } + + /// <summary> + /// Starts the server. + /// </summary> + public void Start() + { + lock (myLock) { - Console.WriteLine("Exception while handling RPC: " + e); + Preconditions.CheckState(!startRequested); + startRequested = true; + + handle.Start(); + AllowOneRpc(); } } /// <summary> /// Requests server shutdown and when there are no more calls being serviced, - /// cleans up used resources. + /// cleans up used resources. The returned task finishes when shutdown procedure + /// is complete. /// </summary> - /// <returns>The async.</returns> public async Task ShutdownAsync() { + lock (myLock) + { + Preconditions.CheckState(startRequested); + Preconditions.CheckState(!shutdownRequested); + shutdownRequested = true; + } + handle.ShutdownAndNotify(serverShutdownHandler); await shutdownTcs.Task; handle.Dispose(); @@ -152,19 +164,43 @@ namespace Grpc.Core handle.Dispose(); } - private async Task StartHandlingRpcs() + /// <summary> + /// Allows one new RPC call to be received by server. + /// </summary> + private void AllowOneRpc() { - while (true) + lock (myLock) { - await Task.Factory.StartNew(RunRpc); + if (!shutdownRequested) + { + handle.RequestCall(GetCompletionQueue(), newServerRpcHandler); + } } } - private void AllowOneRpc() + /// <summary> + /// Selects corresponding handler for given call and handles the call. + /// </summary> + private void InvokeCallHandler(CallSafeHandle call, string method) { - AssertCallOk(handle.RequestCall(GetCompletionQueue(), newServerRpcHandler)); + try + { + IServerCallHandler callHandler; + if (!callHandlers.TryGetValue(method, out callHandler)) + { + callHandler = new NoSuchMethodCallHandler(); + } + callHandler.StartCall(method, call, GetCompletionQueue()); + } + catch (Exception e) + { + Console.WriteLine("Exception while handling RPC: " + e); + } } + /// <summary> + /// Handles the native callback. + /// </summary> private void HandleNewServerRpc(GRPCOpError error, IntPtr batchContextPtr) { try @@ -176,13 +212,16 @@ namespace Grpc.Core // TODO: handle error } - var rpcInfo = new NewRpcInfo(ctx.GetServerRpcNewCall(), ctx.GetServerRpcNewMethod()); + CallSafeHandle call = ctx.GetServerRpcNewCall(); + string method = ctx.GetServerRpcNewMethod(); // after server shutdown, the callback returns with null call - if (!rpcInfo.Call.IsInvalid) + if (!call.IsInvalid) { - newRpcQueue.Add(rpcInfo); + Task.Run(() => InvokeCallHandler(call, method)); } + + AllowOneRpc(); } catch (Exception e) { @@ -190,6 +229,10 @@ namespace Grpc.Core } } + /// <summary> + /// Handles native callback. + /// </summary> + /// <param name="eventPtr"></param> private void HandleServerShutdown(IntPtr eventPtr) { try @@ -202,42 +245,9 @@ namespace Grpc.Core } } - private static void AssertCallOk(GRPCCallError callError) - { - Trace.Assert(callError == GRPCCallError.GRPC_CALL_OK, "Status not GRPC_CALL_OK"); - } - private static CompletionQueueSafeHandle GetCompletionQueue() { return GrpcEnvironment.ThreadPool.CompletionQueue; } - - private struct NewRpcInfo - { - private CallSafeHandle call; - private string method; - - public NewRpcInfo(CallSafeHandle call, string method) - { - this.call = call; - this.method = method; - } - - public CallSafeHandle Call - { - get - { - return this.call; - } - } - - public string Method - { - get - { - return this.method; - } - } - } } } diff --git a/src/csharp/Grpc.Examples.MathServer/MathServer.cs b/src/csharp/Grpc.Examples.MathServer/MathServer.cs index f7429fb43f..abc7ef05e4 100644 --- a/src/csharp/Grpc.Examples.MathServer/MathServer.cs +++ b/src/csharp/Grpc.Examples.MathServer/MathServer.cs @@ -40,7 +40,7 @@ namespace math { public static void Main(string[] args) { - String host = "0.0.0.0"; + string host = "0.0.0.0"; GrpcEnvironment.Initialize(); diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index 584bf1068d..6ae8041fb7 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -80,5 +80,7 @@ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> </None> </ItemGroup> - <ItemGroup /> + <ItemGroup> + <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" /> + </ItemGroup> </Project>
\ No newline at end of file diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 6da7d3c830..f4ae6fab84 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -607,19 +607,19 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, rb_raise(grpc_rb_eCallError, "grpc_call_start_batch failed with %s (code=%d)", grpc_call_error_detail_of(err), err); - return; + return Qnil; } ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); if (ev == NULL) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); - return; + return Qnil; } if (ev->data.op_complete != GRPC_OP_OK) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eCallError, "start_batch completion failed, (code=%d)", ev->data.op_complete); - return; + return Qnil; } /* Build and return the BatchResult struct result */ diff --git a/src/ruby/grpc.gemspec b/src/ruby/grpc.gemspec index c633579102..19b3e21cb6 100755 --- a/src/ruby/grpc.gemspec +++ b/src/ruby/grpc.gemspec @@ -26,7 +26,7 @@ Gem::Specification.new do |s| s.add_dependency 'google-protobuf', '~> 3.0.0alpha.1.1' s.add_dependency 'googleauth', '~> 0.4' # reqd for interop tests - s.add_dependency 'logging', '~> 1.8' + s.add_dependency 'logging', '~> 2.0' s.add_dependency 'minitest', '~> 5.4' # reqd for interop tests s.add_development_dependency 'simplecov', '~> 0.9' diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb index b0f68035cd..80b5743e91 100644 --- a/src/ruby/lib/grpc.rb +++ b/src/ruby/lib/grpc.rb @@ -30,6 +30,7 @@ require 'grpc/errors' require 'grpc/grpc' require 'grpc/logconfig' +require 'grpc/notifier' require 'grpc/version' require 'grpc/core/time_consts' require 'grpc/generic/active_call' diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 43ba549905..947c39cd22 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -188,7 +188,7 @@ module GRPC # @param marshalled [false, true] indicates if the object is already # marshalled. def remote_send(req, marshalled = false) - logger.debug("sending #{req.inspect}, marshalled? #{marshalled}") + logger.debug("sending #{req}, marshalled? #{marshalled}") if marshalled payload = req else diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index b813ab5b54..4ca3004d6f 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -123,8 +123,7 @@ module GRPC break if req.equal?(END_OF_READS) yield req end - @loop_th.join - @enq_th.join + @enq_th.join if @enq_th.alive? end # during bidi-streaming, read the requests to send from a separate thread diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 88c24aa92b..3375fcf20a 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -54,6 +54,18 @@ module GRPC end module_function :handle_signals + # Sets up a signal handler that adds signals to the signal handling global. + # + # Signal handlers should do as little as humanly possible. + # Here, they just add themselves to $grpc_signals + # + # RpcServer (and later other parts of gRPC) monitors the signals + # $grpc_signals in its own non-signal context. + def trap_signals + %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } + end + module_function :trap_signals + # Pool is a simple thread pool. class Pool # Default keep alive period is 1s @@ -172,17 +184,6 @@ module GRPC # Signal check period is 0.25s SIGNAL_CHECK_PERIOD = 0.25 - # Sets up a signal handler that adds signals to the signal handling global. - # - # Signal handlers should do as little as humanly possible. - # Here, they just add themselves to $grpc_signals - # - # RpcServer (and later other parts of gRPC) monitors the signals - # $grpc_signals in its own non-signal context. - def self.trap_signals - %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } } - end - # setup_cq is used by #initialize to constuct a Core::CompletionQueue from # its arguments. def self.setup_cq(alt_cq) @@ -299,12 +300,12 @@ module GRPC # Runs the server in its own thread, then waits for signal INT or TERM on # the current thread to terminate it. def run_till_terminated - self.class.trap_signals + GRPC.trap_signals t = Thread.new { run } wait_till_running loop do sleep SIGNAL_CHECK_PERIOD - break unless handle_signals + break unless GRPC.handle_signals end stop t.join diff --git a/src/ruby/lib/grpc/notifier.rb b/src/ruby/lib/grpc/notifier.rb new file mode 100644 index 0000000000..caa18bbed6 --- /dev/null +++ b/src/ruby/lib/grpc/notifier.rb @@ -0,0 +1,60 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# GRPC contains the General RPC module. +module GRPC + # Notifier is useful high-level synchronization primitive. + class Notifier + attr_reader :payload, :notified + alias_method :notified?, :notified + + def initialize + @mutex = Mutex.new + @cvar = ConditionVariable.new + @notified = false + @payload = nil + end + + def wait + @mutex.synchronize do + @cvar.wait(@mutex) until notified? + end + end + + def notify(payload) + @mutex.synchronize do + return Error.new('already notified') if notified? + @payload = payload + @notified = true + @cvar.signal + return nil + end + end + end +end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index 88c6b44c22..98d68ccfbb 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -29,37 +29,8 @@ require 'grpc' -# Notifier is useful high-level synchronization primitive. -class Notifier - attr_reader :payload, :notified - alias_method :notified?, :notified - - def initialize - @mutex = Mutex.new - @cvar = ConditionVariable.new - @notified = false - @payload = nil - end - - def wait - @mutex.synchronize do - @cvar.wait(@mutex) until notified? - end - end - - def notify(payload) - @mutex.synchronize do - return Error.new('already notified') if notified? - @payload = payload - @notified = true - @cvar.signal - return nil - end - end -end - def wakey_thread(&blk) - n = Notifier.new + n = GRPC::Notifier.new t = Thread.new do blk.call(n) end |