diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/compiler/python_generator.cc | 26 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.c | 1 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/PlatformApis.cs | 11 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Internal/UnmanagedLibrary.cs | 29 | ||||
-rw-r--r-- | src/python/grpcio/grpc/__init__.py | 16 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/active_call.rb | 3 | ||||
-rw-r--r-- | src/ruby/lib/grpc/generic/rpc_server.rb | 1 | ||||
-rwxr-xr-x | src/ruby/pb/test/server.rb | 65 | ||||
-rw-r--r-- | src/ruby/qps/client.rb | 7 | ||||
-rw-r--r-- | src/ruby/qps/qps-common.rb | 16 | ||||
-rw-r--r-- | src/ruby/qps/server.rb | 14 | ||||
-rwxr-xr-x | src/ruby/qps/worker.rb | 8 |
12 files changed, 136 insertions, 61 deletions
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc index 0f61b1fb6c..6830f49931 100644 --- a/src/compiler/python_generator.cc +++ b/src/compiler/python_generator.cc @@ -234,6 +234,13 @@ bool PrintBetaServicer(const ServiceDescriptor* service, Printer* out) { service->name()); { IndentScope raii_class_indent(out); + out->Print( + "\"\"\"The Beta API is deprecated for 0.15.0 and later.\n" + "\nIt is recommended to use the GA API (classes and functions in this\n" + "file not marked beta) for all further purposes. This class was " + "generated\n" + "only to ease transition from grpcio<0.15.0 to " + "grpcio>=0.15.0.\"\"\"\n"); PrintAllComments(service, out); for (int i = 0; i < service->method_count(); ++i) { auto meth = service->method(i); @@ -256,6 +263,13 @@ bool PrintBetaStub(const ServiceDescriptor* service, Printer* out) { out->Print("class Beta$Service$Stub(object):\n", "Service", service->name()); { IndentScope raii_class_indent(out); + out->Print( + "\"\"\"The Beta API is deprecated for 0.15.0 and later.\n" + "\nIt is recommended to use the GA API (classes and functions in this\n" + "file not marked beta) for all further purposes. This class was " + "generated\n" + "only to ease transition from grpcio<0.15.0 to " + "grpcio>=0.15.0.\"\"\"\n"); PrintAllComments(service, out); for (int i = 0; i < service->method_count(); ++i) { const MethodDescriptor* meth = service->method(i); @@ -287,6 +301,12 @@ bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name, "Service", service->name()); { IndentScope raii_create_server_indent(out); + out->Print( + "\"\"\"The Beta API is deprecated for 0.15.0 and later.\n" + "\nIt is recommended to use the GA API (classes and functions in this\n" + "file not marked beta) for all further purposes. This function was\n" + "generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0" + "\"\"\"\n"); map<grpc::string, grpc::string> method_implementation_constructors; map<grpc::string, grpc::string> input_message_modules_and_classes; map<grpc::string, grpc::string> output_message_modules_and_classes; @@ -386,6 +406,12 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name, " metadata_transformer=None, pool=None, pool_size=None):\n"); { IndentScope raii_create_server_indent(out); + out->Print( + "\"\"\"The Beta API is deprecated for 0.15.0 and later.\n" + "\nIt is recommended to use the GA API (classes and functions in this\n" + "file not marked beta) for all further purposes. This function was\n" + "generated only to ease transition from grpcio<0.15.0 to grpcio>=0.15.0" + "\"\"\"\n"); map<grpc::string, grpc::string> method_cardinalities; map<grpc::string, grpc::string> input_message_modules_and_classes; map<grpc::string, grpc::string> output_message_modules_and_classes; diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 35054c42b5..448a72671c 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -319,6 +319,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); + if (allocated) gpr_free(allocated); return; } diff --git a/src/csharp/Grpc.Core/Internal/PlatformApis.cs b/src/csharp/Grpc.Core/Internal/PlatformApis.cs index 15391ddc64..406204e0a7 100644 --- a/src/csharp/Grpc.Core/Internal/PlatformApis.cs +++ b/src/csharp/Grpc.Core/Internal/PlatformApis.cs @@ -50,6 +50,7 @@ namespace Grpc.Core.Internal static readonly bool isMacOSX; static readonly bool isWindows; static readonly bool isMono; + static readonly bool isNetCore; static PlatformApis() { @@ -57,6 +58,7 @@ namespace Grpc.Core.Internal isLinux = RuntimeInformation.IsOSPlatform(OSPlatform.Linux); isMacOSX = RuntimeInformation.IsOSPlatform(OSPlatform.OSX); isWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows); + isNetCore = RuntimeInformation.FrameworkDescription.StartsWith(".NET Core"); #else var platform = Environment.OSVersion.Platform; @@ -64,6 +66,7 @@ namespace Grpc.Core.Internal isMacOSX = (platform == PlatformID.Unix && GetUname() == "Darwin"); isLinux = (platform == PlatformID.Unix && !isMacOSX); isWindows = (platform == PlatformID.Win32NT || platform == PlatformID.Win32S || platform == PlatformID.Win32Windows); + isNetCore = false; #endif isMono = Type.GetType("Mono.Runtime") != null; } @@ -88,6 +91,14 @@ namespace Grpc.Core.Internal get { return isMono; } } + /// <summary> + /// true if running on .NET Core (CoreCLR), false otherwise. + /// </summary> + public static bool IsNetCore + { + get { return isNetCore; } + } + public static bool Is64Bit { get { return IntPtr.Size == 8; } diff --git a/src/csharp/Grpc.Core/Internal/UnmanagedLibrary.cs b/src/csharp/Grpc.Core/Internal/UnmanagedLibrary.cs index dc629bd714..31e1402849 100644 --- a/src/csharp/Grpc.Core/Internal/UnmanagedLibrary.cs +++ b/src/csharp/Grpc.Core/Internal/UnmanagedLibrary.cs @@ -44,10 +44,9 @@ namespace Grpc.Core.Internal { /// <summary> /// Represents a dynamically loaded unmanaged library in a (partially) platform independent manner. - /// An important difference in library loading semantics is that on Windows, once we load a dynamic library using LoadLibrary, - /// that library becomes instantly available for <c>DllImport</c> P/Invoke calls referring to the same library name. - /// On Unix systems, dlopen has somewhat different semantics, so we need to use dlsym and <c>Marshal.GetDelegateForFunctionPointer</c> - /// to obtain delegates to native methods. + /// First, the native library is loaded using dlopen (on Unix systems) or using LoadLibrary (on Windows). + /// dlsym or GetProcAddress are then used to obtain symbol addresses. <c>Marshal.GetDelegateForFunctionPointer</c> + /// transforms the addresses into delegates to native methods. /// See http://stackoverflow.com/questions/13461989/p-invoke-to-dynamically-loaded-library-on-mono. /// </summary> internal class UnmanagedLibrary @@ -114,6 +113,10 @@ namespace Grpc.Core.Internal { return Mono.dlsym(this.handle, symbolName); } + if (PlatformApis.IsNetCore) + { + return CoreCLR.dlsym(this.handle, symbolName); + } return Linux.dlsym(this.handle, symbolName); } if (PlatformApis.IsMacOSX) @@ -149,6 +152,10 @@ namespace Grpc.Core.Internal { return Mono.dlopen(libraryPath, RTLD_GLOBAL + RTLD_LAZY); } + if (PlatformApis.IsNetCore) + { + return CoreCLR.dlopen(libraryPath, RTLD_GLOBAL + RTLD_LAZY); + } return Linux.dlopen(libraryPath, RTLD_GLOBAL + RTLD_LAZY); } if (PlatformApis.IsMacOSX) @@ -215,5 +222,19 @@ namespace Grpc.Core.Internal [DllImport("__Internal")] internal static extern IntPtr dlsym(IntPtr handle, string symbol); } + + /// <summary> + /// Similarly as for Mono on Linux, we load symbols for + /// dlopen and dlsym from the "libcoreclr.so", + /// to avoid the dependency on libc-dev Linux. + /// </summary> + private static class CoreCLR + { + [DllImport("libcoreclr.so")] + internal static extern IntPtr dlopen(string filename, int flags); + + [DllImport("libcoreclr.so")] + internal static extern IntPtr dlsym(IntPtr handle, string symbol); + } } } diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index b9b662c032..526bd9e14f 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -649,6 +649,10 @@ class Channel(six.with_metaclass(abc.ABCMeta)): Args: method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the response + message. Response goes undeserialized in case None is passed. Returns: A UnaryUnaryMultiCallable value for the named unary-unary method. @@ -662,6 +666,10 @@ class Channel(six.with_metaclass(abc.ABCMeta)): Args: method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the response + message. Response goes undeserialized in case None is passed. Returns: A UnaryStreamMultiCallable value for the name unary-stream method. @@ -675,6 +683,10 @@ class Channel(six.with_metaclass(abc.ABCMeta)): Args: method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the response + message. Response goes undeserialized in case None is passed. Returns: A StreamUnaryMultiCallable value for the named stream-unary method. @@ -688,6 +700,10 @@ class Channel(six.with_metaclass(abc.ABCMeta)): Args: method: The name of the RPC method. + request_serializer: Optional behaviour for serializing the request + message. Request goes unserialized in case None is passed. + response_deserializer: Optional behaviour for deserializing the response + message. Response goes undeserialized in case None is passed. Returns: A StreamStreamMultiCallable value for the named stream-stream method. diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index 23688dc924..dfc2644c46 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -240,11 +240,8 @@ module GRPC @call.metadata = batch_result.metadata @metadata_received = true end - GRPC.logger.debug("received req: #{batch_result}") unless batch_result.nil? || batch_result.message.nil? - GRPC.logger.debug("received req.to_s: #{batch_result.message}") res = @unmarshal.call(batch_result.message) - GRPC.logger.debug("received_req (unmarshalled): #{res.inspect}") return res end GRPC.logger.debug('found nil; the final response has been sent') diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index da0f6503db..7dbcb7d479 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -304,7 +304,6 @@ module GRPC # allow the metadata to be accessed from the call an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers - GRPC.logger.debug("call md is #{an_rpc.metadata}") connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb index 0808121661..3f1e0a1ccf 100755 --- a/src/ruby/pb/test/server.rb +++ b/src/ruby/pb/test/server.rb @@ -129,27 +129,36 @@ def nulls(l) [].pack('x' * l).force_encoding('ascii-8bit') end -# A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. -class EnumeratorQueue - extend Forwardable - def_delegators :@q, :push - - def initialize(sentinel) - @q = Queue.new - @sentinel = sentinel - end +# A FullDuplexEnumerator passes requests to a block and yields generated responses +class FullDuplexEnumerator + include Grpc::Testing + include Grpc::Testing::PayloadType + def initialize(requests) + @requests = requests + end def each_item return enum_for(:each_item) unless block_given? - loop do - r = @q.pop - break if r.equal?(@sentinel) - fail r if r.is_a? Exception - yield r + GRPC.logger.info('interop-server: started receiving') + begin + cls = StreamingOutputCallResponse + @requests.each do |req| + req.response_parameters.each do |params| + resp_size = params.size + GRPC.logger.info("read a req, response size is #{resp_size}") + yield cls.new(payload: Payload.new(type: req.response_type, + body: nulls(resp_size))) + end + end + GRPC.logger.info('interop-server: finished receiving') + rescue StandardError => e + GRPC.logger.info('interop-server: failed') + GRPC.logger.warn(e) + fail e end end end - + # A runnable implementation of the schema-specified testing service, with each # service method implemented as required by the interop testing spec. class TestTarget < Grpc::Testing::TestService::Service @@ -182,31 +191,9 @@ class TestTarget < Grpc::Testing::TestService::Service def full_duplex_call(reqs) # reqs is a lazy Enumerator of the requests sent by the client. - q = EnumeratorQueue.new(self) - cls = StreamingOutputCallResponse - Thread.new do - begin - GRPC.logger.info('interop-server: started receiving') - reqs.each do |req| - req.response_parameters.each do |params| - resp_size = params.size - GRPC.logger.info("read a req, response size is #{resp_size}") - resp = cls.new(payload: Payload.new(type: req.response_type, - body: nulls(resp_size))) - q.push(resp) - end - end - GRPC.logger.info('interop-server: finished receiving') - q.push(self) - rescue StandardError => e - GRPC.logger.info('interop-server: failed') - GRPC.logger.warn(e) - q.push(e) # share the exception with the enumerator - end - end - q.each_item + FullDuplexEnumerator.new(reqs).each_item end - + def half_duplex_call(reqs) # TODO: update with unique behaviour of the half_duplex_call if that's # ever required by any of the tests. diff --git a/src/ruby/qps/client.rb b/src/ruby/qps/client.rb index 7ed648acef..8aed866da5 100644 --- a/src/ruby/qps/client.rb +++ b/src/ruby/qps/client.rb @@ -89,12 +89,14 @@ class BenchmarkClient payload: gtp.new(type: gtpt::COMPRESSABLE, body: nulls(simple_params.req_size))) + @child_threads = [] + (0..config.client_channels-1).each do |chan| gtbss = Grpc::Testing::BenchmarkService::Stub st = config.server_targets stub = gtbss.new(st[chan % st.length], cred, **opts) (0..config.outstanding_rpcs_per_channel-1).each do |r| - Thread.new { + @child_threads << Thread.new { case config.load_params.load.to_s when 'closed_loop' waiter = nil @@ -162,5 +164,8 @@ class BenchmarkClient end def shutdown @done = true + @child_threads.each do |thread| + thread.join + end end end diff --git a/src/ruby/qps/qps-common.rb b/src/ruby/qps/qps-common.rb index 4119d600b1..4714ccfdb7 100644 --- a/src/ruby/qps/qps-common.rb +++ b/src/ruby/qps/qps-common.rb @@ -52,6 +52,7 @@ def load_test_certs files.map { |f| File.open(File.join(data_dir, f)).read } end + # A EnumeratorQueue wraps a Queue yielding the items added to it via each_item. class EnumeratorQueue extend Forwardable @@ -73,4 +74,19 @@ class EnumeratorQueue end end +# A PingPongEnumerator reads requests and responds one-by-one when enumerated +# via #each_item +class PingPongEnumerator + def initialize(reqs) + @reqs = reqs + end + def each_item + return enum_for(:each_item) unless block_given? + sr = Grpc::Testing::SimpleResponse + pl = Grpc::Testing::Payload + @reqs.each do |req| + yield sr.new(payload: pl.new(body: nulls(req.response_size))) + end + end +end diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb index cd98ee1fd9..d0c2073dd1 100644 --- a/src/ruby/qps/server.rb +++ b/src/ruby/qps/server.rb @@ -49,16 +49,7 @@ class BenchmarkServiceImpl < Grpc::Testing::BenchmarkService::Service sr.new(payload: pl.new(body: nulls(req.response_size))) end def streaming_call(reqs) - q = EnumeratorQueue.new(self) - Thread.new { - sr = Grpc::Testing::SimpleResponse - pl = Grpc::Testing::Payload - reqs.each do |req| - q.push(sr.new(payload: pl.new(body: nulls(req.response_size)))) - end - q.push(self) - } - q.each_item + PingPongEnumerator.new(reqs).each_item end end @@ -71,7 +62,8 @@ class BenchmarkServer else cred = :this_port_is_insecure end - @server = GRPC::RpcServer.new + # Make sure server can handle the large number of calls in benchmarks + @server = GRPC::RpcServer.new(pool_size: 100, max_waiting_requests: 100) @port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred) @server.handle(BenchmarkServiceImpl.new) @start_time = Time.now diff --git a/src/ruby/qps/worker.rb b/src/ruby/qps/worker.rb index 12b8087ca0..61a0b723a3 100755 --- a/src/ruby/qps/worker.rb +++ b/src/ruby/qps/worker.rb @@ -64,8 +64,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service q.push(gtss.new(stats: bms.mark(req.mark.reset), cores: cpu_cores)) end end - q.push(self) bms.stop + q.push(self) } q.each_item end @@ -83,8 +83,8 @@ class WorkerServiceImpl < Grpc::Testing::WorkerService::Service client.mark(req.mark.reset))) end end - q.push(self) client.shutdown + q.push(self) } q.each_item end @@ -118,6 +118,10 @@ def main options['server_port'] = v end end.parse! + + # Configure any errors with client or server child threads to surface + Thread.abort_on_exception = true + s = GRPC::RpcServer.new s.add_http2_port("0.0.0.0:" + options['driver_port'].to_s, :this_port_is_insecure) |