diff options
author | Craig Tiller <ctiller@ctiller.mtv.corp.google.com> | 2015-11-16 11:34:24 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@ctiller.mtv.corp.google.com> | 2015-11-16 11:34:24 -0800 |
commit | 36da1380afab7c3f081e2008e521f0bd1d851358 (patch) | |
tree | bac946254e3f03dd74520ba1ae67e7461088c12e /src | |
parent | 892f2d372641967c5fe18c72b60393eca913c8f2 (diff) | |
parent | 118c0a0eec5cedfb4d610483c3e26dacf71bcd4a (diff) |
Merge github.com:grpc/grpc into new_op
Diffstat (limited to 'src')
27 files changed, 181 insertions, 191 deletions
diff --git a/src/core/support/time_win32.c b/src/core/support/time_win32.c index bc0586d069..623a8d9233 100644 --- a/src/core/support/time_win32.c +++ b/src/core/support/time_win32.c @@ -66,14 +66,12 @@ gpr_timespec gpr_now(gpr_clock_type clock) { now_tv.tv_nsec = now_tb.millitm * 1000000; break; case GPR_CLOCK_MONOTONIC: + case GPR_CLOCK_PRECISE: QueryPerformanceCounter(×tamp); now_dbl = (timestamp.QuadPart - g_start_time.QuadPart) * g_time_scale; now_tv.tv_sec = (time_t)now_dbl; now_tv.tv_nsec = (int)((now_dbl - (double)now_tv.tv_sec) * 1e9); break; - case GPR_CLOCK_PRECISE: - gpr_precise_clock_now(&now_tv); - break; } return now_tv; } diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc index 584a0cf8ab..c11734d737 100644 --- a/src/node/ext/channel.cc +++ b/src/node/ext/channel.cc @@ -82,7 +82,7 @@ bool ParseChannelArgs(Local<Value> args_val, return false; } grpc_channel_args *channel_args = reinterpret_cast<grpc_channel_args*>( - malloc(sizeof(channel_args))); + malloc(sizeof(grpc_channel_args))); *channel_args_ptr = channel_args; Local<Object> args_hash = Nan::To<Object>(args_val).ToLocalChecked(); Local<Array> keys = Nan::GetOwnPropertyNames(args_hash).ToLocalChecked(); diff --git a/src/node/src/client.js b/src/node/src/client.js index 3cdd550752..d57826781d 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -612,7 +612,15 @@ exports.makeClientConstructor = function(methods, serviceName) { if (!options) { options = {}; } - options['grpc.primary_user_agent'] = 'grpc-node/' + version; + /* Append the grpc-node user agent string after the application user agent + * string, and put the combination at the beginning of the user agent string + */ + if (options['grpc.primary_user_agent']) { + options['grpc.primary_user_agent'] += ' '; + } else { + options['grpc.primary_user_agent'] = ''; + } + options['grpc.primary_user_agent'] += 'grpc-node/' + version; /* Private fields use $ as a prefix instead of _ because it is an invalid * prefix of a method name */ this.$channel = new grpc.Channel(address, credentials, options); diff --git a/src/objective-c/examples/Sample/Podfile b/src/objective-c/examples/Sample/Podfile index 72308c1619..3b2f412569 100644 --- a/src/objective-c/examples/Sample/Podfile +++ b/src/objective-c/examples/Sample/Podfile @@ -1,8 +1,9 @@ source 'https://github.com/CocoaPods/Specs.git' platform :ios, '8.0' +pod 'Protobuf', :path => "../../../../third_party/protobuf" pod 'gRPC', :path => "../../../.." -pod 'RemoteTest', :path => "../../generated_libraries/RemoteTestClient" +pod 'RemoteTest', :path => "../RemoteTestClient" target 'Sample' do end diff --git a/src/objective-c/examples/Sample/Sample/ViewController.m b/src/objective-c/examples/Sample/Sample/ViewController.m index 05bd6fa2db..3d634a340d 100644 --- a/src/objective-c/examples/Sample/Sample/ViewController.m +++ b/src/objective-c/examples/Sample/Sample/ViewController.m @@ -34,7 +34,7 @@ #import "ViewController.h" #import <GRPCClient/GRPCCall.h> -#import <GRPCClient/GRPCMethodName.h> +#import <ProtoRPC/ProtoMethod.h> #import <RemoteTest/Messages.pbobjc.h> #import <RemoteTest/Test.pbrpc.h> #import <RxLibrary/GRXWriter+Immediate.h> @@ -66,14 +66,14 @@ // Same example call using the generic gRPC client library: - GRPCMethodName *method = [[GRPCMethodName alloc] initWithPackage:@"grpc.testing" - interface:@"TestService" - method:@"UnaryCall"]; + ProtoMethod *method = [[ProtoMethod alloc] initWithPackage:@"grpc.testing" + service:@"TestService" + method:@"UnaryCall"]; - id<GRXWriter> requestsWriter = [GRXWriter writerWithValue:[request data]]; + GRXWriter *requestsWriter = [GRXWriter writerWithValue:[request data]]; GRPCCall *call = [[GRPCCall alloc] initWithHost:kRemoteHost - method:method + path:method.HTTPPath requestsWriter:requestsWriter]; id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { diff --git a/src/objective-c/generated_libraries/RouteGuideClient/RouteGuide.podspec b/src/objective-c/generated_libraries/RouteGuideClient/RouteGuide.podspec deleted file mode 100644 index 23ccffe69d..0000000000 --- a/src/objective-c/generated_libraries/RouteGuideClient/RouteGuide.podspec +++ /dev/null @@ -1,31 +0,0 @@ -Pod::Spec.new do |s| - s.name = "RouteGuide" - s.version = "0.0.1" - s.license = "New BSD" - - s.ios.deployment_target = "6.0" - s.osx.deployment_target = "10.8" - - # Run protoc with the Objective-C and gRPC plugins to generate protocol messages and gRPC clients. - s.prepare_command = <<-CMD - BINDIR=../../../../bins/$CONFIG - PROTOC=$BINDIR/protobuf/protoc - PLUGIN=$BINDIR/grpc_objective_c_plugin - $PROTOC --plugin=protoc-gen-grpc=$PLUGIN --objc_out=. --grpc_out=. *.proto - CMD - - s.subspec "Messages" do |ms| - ms.source_files = "*.pbobjc.{h,m}" - ms.header_mappings_dir = "." - ms.requires_arc = false - ms.dependency "Protobuf", "~> 3.0.0-alpha-3" - end - - s.subspec "Services" do |ss| - ss.source_files = "*.pbrpc.{h,m}" - ss.header_mappings_dir = "." - ss.requires_arc = true - ss.dependency "gRPC", "~> 0.5" - ss.dependency "#{s.name}/Messages" - end -end diff --git a/src/objective-c/generated_libraries/RouteGuideClient/route_guide.proto b/src/objective-c/generated_libraries/RouteGuideClient/route_guide.proto deleted file mode 100644 index 19592e2ebd..0000000000 --- a/src/objective-c/generated_libraries/RouteGuideClient/route_guide.proto +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -syntax = "proto3"; - -package routeguide; - -option objc_class_prefix = "RGD"; - -// Interface exported by the server. -service RouteGuide { - // A simple RPC. - // - // Obtains the feature at a given position. - rpc GetFeature(Point) returns (Feature) {} - - // A server-to-client streaming RPC. - // - // Obtains the Features available within the given Rectangle. Results are - // streamed rather than returned at once (e.g. in a response message with a - // repeated field), as the rectangle may cover a large area and contain a - // huge number of features. - rpc ListFeatures(Rectangle) returns (stream Feature) {} - - // A client-to-server streaming RPC. - // - // Accepts a stream of Points on a route being traversed, returning a - // RouteSummary when traversal is completed. - rpc RecordRoute(stream Point) returns (RouteSummary) {} - - // A Bidirectional streaming RPC. - // - // Accepts a stream of RouteNotes sent while a route is being traversed, - // while receiving other RouteNotes (e.g. from other users). - rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} -} - -// Points are represented as latitude-longitude pairs in the E7 representation -// (degrees multiplied by 10**7 and rounded to the nearest integer). -// Latitudes should be in the range +/- 90 degrees and longitude should be in -// the range +/- 180 degrees (inclusive). -message Point { - int32 latitude = 1; - int32 longitude = 2; -} - -// A latitude-longitude rectangle, represented as two diagonally opposite -// points "lo" and "hi". -message Rectangle { - // One corner of the rectangle. - Point lo = 1; - - // The other corner of the rectangle. - Point hi = 2; -} - -// A feature names something at a given point. -// -// If a feature could not be named, the name is empty. -message Feature { - // The name of the feature. - string name = 1; - - // The point where the feature is detected. - Point location = 2; -} - -// A RouteNote is a message sent while at a given point. -message RouteNote { - // The location from which the message is sent. - Point location = 1; - - // The message to be sent. - string message = 2; -} - -// A RouteSummary is received in response to a RecordRoute rpc. -// -// It contains the number of individual points received, the number of -// detected features, and the total distance covered as the cumulative sum of -// the distance between each point. -message RouteSummary { - // The number of points received. - int32 point_count = 1; - - // The number of known features passed while traversing the route. - int32 feature_count = 2; - - // The distance covered in metres. - int32 distance = 3; - - // The duration of the traversal in seconds. - int32 elapsed_time = 4; -} diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile index 2a9b894cf6..cab608d37f 100644 --- a/src/objective-c/tests/Podfile +++ b/src/objective-c/tests/Podfile @@ -3,8 +3,7 @@ platform :ios, '8.0' pod 'Protobuf', :path => "../../../third_party/protobuf" pod 'gRPC', :path => "../../.." -pod 'RemoteTest', :path => "../generated_libraries/RemoteTestClient" -pod 'RouteGuide', :path => "../generated_libraries/RouteGuideClient" +pod 'RemoteTest', :path => "RemoteTestClient" link_with 'AllTests', 'RxLibraryUnitTests', diff --git a/src/objective-c/generated_libraries/RemoteTestClient/RemoteTest.podspec b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec index 8710753e59..8710753e59 100644 --- a/src/objective-c/generated_libraries/RemoteTestClient/RemoteTest.podspec +++ b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec diff --git a/src/objective-c/generated_libraries/RemoteTestClient/empty.proto b/src/objective-c/tests/RemoteTestClient/empty.proto index a678048289..a678048289 100644 --- a/src/objective-c/generated_libraries/RemoteTestClient/empty.proto +++ b/src/objective-c/tests/RemoteTestClient/empty.proto diff --git a/src/objective-c/generated_libraries/RemoteTestClient/messages.proto b/src/objective-c/tests/RemoteTestClient/messages.proto index 85d93c2ff9..85d93c2ff9 100644 --- a/src/objective-c/generated_libraries/RemoteTestClient/messages.proto +++ b/src/objective-c/tests/RemoteTestClient/messages.proto diff --git a/src/objective-c/generated_libraries/RemoteTestClient/test.proto b/src/objective-c/tests/RemoteTestClient/test.proto index 514c3b8095..514c3b8095 100644 --- a/src/objective-c/generated_libraries/RemoteTestClient/test.proto +++ b/src/objective-c/tests/RemoteTestClient/test.proto diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index c26be607ff..aa4de349ea 100755 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -51,6 +51,7 @@ class BaseStub * @param $opts array * - 'update_metadata': (optional) a callback function which takes in a * metadata array, and returns an updated metadata array + * - 'grpc.primary_user_agent': (optional) a user-agent string */ public function __construct($hostname, $opts) { @@ -64,7 +65,12 @@ class BaseStub } $package_config = json_decode( file_get_contents(dirname(__FILE__).'/../../composer.json'), true); - $opts['grpc.primary_user_agent'] = + if (!empty($opts['grpc.primary_user_agent'])) { + $opts['grpc.primary_user_agent'] .= ' '; + } else { + $opts['grpc.primary_user_agent'] = ''; + } + $opts['grpc.primary_user_agent'] .= 'grpc-php/'.$package_config['version']; $this->channel = new Channel($hostname, $opts); } diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 6b5beb6f5d..40364328ee 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -139,7 +139,15 @@ static const rb_data_type_t grpc_rb_md_ary_data_type = { {NULL, NULL}}, NULL, NULL, - 0}; +#ifdef RUBY_TYPED_FREE_IMMEDIATELY + /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because + * grpc_rb_call_destroy + * touches a hash object. + * TODO(yugui) Directly use st_table and call the free function earlier? + */ + 0, +#endif +}; /* Describes grpc_call struct for RTypedData */ static const rb_data_type_t grpc_call_data_type = { @@ -148,12 +156,15 @@ static const rb_data_type_t grpc_call_data_type = { {NULL, NULL}}, NULL, NULL, +#ifdef RUBY_TYPED_FREE_IMMEDIATELY /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because * grpc_rb_call_destroy * touches a hash object. * TODO(yugui) Directly use st_table and call the free function earlier? */ - 0}; + 0, +#endif +}; /* Error code details is a hash containing text strings describing errors */ VALUE rb_error_code_details; diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 90afdc3fe1..cd0b966e72 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -111,7 +111,9 @@ static rb_data_type_t grpc_channel_data_type = { {grpc_rb_channel_mark, grpc_rb_channel_free, GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, NULL, NULL, +#ifdef RUBY_TYPED_FREE_IMMEDIATELY RUBY_TYPED_FREE_IMMEDIATELY +#endif }; /* Allocates grpc_rb_channel instances. */ diff --git a/src/ruby/ext/grpc/rb_channel_args.c b/src/ruby/ext/grpc/rb_channel_args.c index 1ba30b69aa..37dd981925 100644 --- a/src/ruby/ext/grpc/rb_channel_args.c +++ b/src/ruby/ext/grpc/rb_channel_args.c @@ -44,7 +44,9 @@ static rb_data_type_t grpc_rb_channel_args_data_type = { {GRPC_RB_GC_NOT_MARKED, GRPC_RB_GC_DONT_FREE, GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, NULL, NULL, +#ifdef RUBY_TYPED_FREE_IMMEDIATELY RUBY_TYPED_FREE_IMMEDIATELY +#endif }; /* A callback the processes the hash key values in channel_args hash */ diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c index 0bc9eb2a97..a7de96d718 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.c +++ b/src/ruby/ext/grpc/rb_completion_queue.c @@ -121,9 +121,11 @@ static rb_data_type_t grpc_rb_completion_queue_data_type = { {GRPC_RB_GC_NOT_MARKED, grpc_rb_completion_queue_destroy, GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, NULL, NULL, +#ifdef RUBY_TYPED_FREE_IMMEDIATELY /* cannot immediately free because grpc_rb_completion_queue_shutdown_drain * calls rb_thread_call_without_gvl. */ - 0 + 0, +#endif }; /* Allocates a completion queue. */ diff --git a/src/ruby/ext/grpc/rb_credentials.c b/src/ruby/ext/grpc/rb_credentials.c index ae757f6986..486ff79f91 100644 --- a/src/ruby/ext/grpc/rb_credentials.c +++ b/src/ruby/ext/grpc/rb_credentials.c @@ -92,7 +92,10 @@ static rb_data_type_t grpc_rb_credentials_data_type = { GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, NULL, NULL, - RUBY_TYPED_FREE_IMMEDIATELY}; +#ifdef RUBY_TYPED_FREE_IMMEDIATELY + RUBY_TYPED_FREE_IMMEDIATELY +#endif +}; /* Allocates Credential instances. Provides safe initial defaults for the instance fields. */ diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index 327fd1a4fc..33f48779d8 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -55,7 +55,10 @@ static rb_data_type_t grpc_rb_timespec_data_type = { {NULL, NULL}}, NULL, NULL, - RUBY_TYPED_FREE_IMMEDIATELY}; +#ifdef RUBY_TYPED_FREE_IMMEDIATELY + RUBY_TYPED_FREE_IMMEDIATELY +#endif +}; /* Alloc func that blocks allocation of a given object by raising an * exception. */ @@ -262,10 +265,20 @@ static void Init_grpc_time_consts() { id_tv_nsec = rb_intern("tv_nsec"); } +/* + TODO: find an alternative to ruby_vm_at_exit that is ok in Ruby 2.0 where + RUBY_TYPED_FREE_IMMEDIATELY is not defined. + + At the moment, registering a function using ruby_vm_at_exit segfaults in Ruby + 2.0. This is not an issue with the gRPC handler. More likely, this was an + in issue with 2.0 that got resolved in 2.1 and has not been backported. +*/ +#ifdef RUBY_TYPED_FREE_IMMEDIATELY static void grpc_rb_shutdown(ruby_vm_t *vm) { (void)vm; grpc_shutdown(); } +#endif /* Initialize the GRPC module structs */ @@ -285,7 +298,12 @@ VALUE sym_metadata = Qundef; void Init_grpc() { grpc_init(); + +/* TODO: find alternative to ruby_vm_at_exit that is ok in Ruby 2.0 */ +#ifdef RUBY_TYPED_FREE_IMMEDIATELY ruby_vm_at_exit(grpc_rb_shutdown); +#endif + grpc_rb_mGRPC = rb_define_module("GRPC"); grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core"); grpc_rb_sNewServerRpc = diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 4469658869..ebdd7e1a34 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -101,11 +101,14 @@ static const rb_data_type_t grpc_rb_server_data_type = { {NULL, NULL}}, NULL, NULL, +#ifdef RUBY_TYPED_FREE_IMMEDIATELY /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free function would block * and we might want to unlock GVL * TODO(yugui) Unlock GVL? */ - 0}; + 0, +#endif +}; /* Allocates grpc_rb_server instances. */ static VALUE grpc_rb_server_alloc(VALUE cls) { diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c index ea4d0d864e..de57585e0b 100644 --- a/src/ruby/ext/grpc/rb_server_credentials.c +++ b/src/ruby/ext/grpc/rb_server_credentials.c @@ -91,7 +91,9 @@ static const rb_data_type_t grpc_rb_server_credentials_data_type = { {grpc_rb_server_credentials_mark, grpc_rb_server_credentials_free, GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}}, NULL, NULL, +#ifdef RUBY_TYPED_FREE_IMMEDIATELY RUBY_TYPED_FREE_IMMEDIATELY +#endif }; /* Allocates ServerCredential instances. diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb index d9cb924735..e80d24edc9 100644 --- a/src/ruby/lib/grpc/generic/active_call.rb +++ b/src/ruby/lib/grpc/generic/active_call.rb @@ -199,11 +199,7 @@ module GRPC # marshalled. def remote_send(req, marshalled = false) GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") - if marshalled - payload = req - else - payload = @marshal.call(req) - end + payload = marshalled ? req : @marshal.call(req) @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload) end @@ -417,7 +413,9 @@ module GRPC # @return [Enumerator, nil] a response Enumerator def bidi_streamer(requests, **kw, &blk) start_call(**kw) unless @started - bd = BidiCall.new(@call, @cq, @marshal, @unmarshal) + bd = BidiCall.new(@call, @cq, @marshal, @unmarshal, + metadata_tag: @metadata_tag) + @metadata_tag = nil # run_on_client ensures metadata is read bd.run_on_client(requests, @op_notifier, &blk) end diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 9dbbb74caf..6b9b785693 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -56,7 +56,8 @@ module GRPC # the call # @param marshal [Function] f(obj)->string that marshal requests # @param unmarshal [Function] f(string)->obj that unmarshals responses - def initialize(call, q, marshal, unmarshal) + # @param metadata_tag [Object] tag object used to collect metadata + def initialize(call, q, marshal, unmarshal, metadata_tag: nil) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') @@ -67,6 +68,7 @@ module GRPC @op_notifier = nil # signals completion on clients @readq = Queue.new @unmarshal = unmarshal + @metadata_tag = metadata_tag end # Begins orchestration of the Bidi stream for a client sending requests. @@ -113,6 +115,18 @@ module GRPC @op_notifier.notify(self) end + # performs a read using @call.run_batch, ensures metadata is set up + def read_using_run_batch + ops = { RECV_MESSAGE => nil } + ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil? + batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) + unless @metadata_tag.nil? + @call.metadata = batch_result.metadata + @metadata_tag = nil + end + batch_result + end + # each_queued_msg yields each message on this instances readq # # - messages are added to the readq by #read_loop @@ -169,9 +183,7 @@ module GRPC loop do GRPC.logger.debug("bidi-read-loop: #{count}") count += 1 - # TODO: ensure metadata is read if available, currently it's not - batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, - RECV_MESSAGE => nil) + batch_result = read_using_run_batch # handle the next message if batch_result.message.nil? diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 228c500672..0e318bd53b 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -418,11 +418,11 @@ module GRPC an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE) break if (!an_rpc.nil?) && an_rpc.call.nil? - c = new_active_server_call(an_rpc) - unless c.nil? - mth = an_rpc.method.to_sym - @pool.schedule(c) do |call| - rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) + active_call = new_active_server_call(an_rpc) + unless active_call.nil? + @pool.schedule(active_call) do |ac| + c, mth = ac + rpc_descs[mth].run_server_method(c, rpc_handlers[mth]) end end rescue Core::CallError, RuntimeError => e @@ -442,6 +442,7 @@ module GRPC # allow the metadata to be accessed from the call handle_call_tag = Object.new an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers + GRPC.logger.debug("call md is #{an_rpc.metadata}") connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) @@ -454,9 +455,11 @@ module GRPC # Create the ActiveCall GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] - ActiveCall.new(an_rpc.call, @cq, - rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), - an_rpc.deadline) + c = ActiveCall.new(an_rpc.call, @cq, + rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), + an_rpc.deadline) + mth = an_rpc.method.to_sym + [c, mth] end protected diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb index 1388685734..b84cd43090 100755 --- a/src/ruby/pb/test/client.rb +++ b/src/ruby/pb/test/client.rb @@ -46,6 +46,7 @@ $LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) require 'optparse' +require 'logger' require 'grpc' require 'googleauth' @@ -59,6 +60,22 @@ require 'signet/ssl_config' AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR +# RubyLogger defines a logger for gRPC based on the standard ruby logger. +module RubyLogger + def logger + LOGGER + end + + LOGGER = Logger.new(STDOUT) + LOGGER.level = Logger::INFO +end + +# GRPC is the general RPC module +module GRPC + # Inject the noop #logger if no module-level logger method has been injected. + extend RubyLogger +end + # AssertionError is use to indicate interop test failures. class AssertionError < RuntimeError; end diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb index 25c1b1e9e6..67877a191f 100755 --- a/src/ruby/pb/test/server.rb +++ b/src/ruby/pb/test/server.rb @@ -45,6 +45,7 @@ $LOAD_PATH.unshift(pb_dir) unless $LOAD_PATH.include?(pb_dir) $LOAD_PATH.unshift(this_dir) unless $LOAD_PATH.include?(this_dir) require 'forwardable' +require 'logger' require 'optparse' require 'grpc' @@ -53,6 +54,60 @@ require 'test/proto/empty' require 'test/proto/messages' require 'test/proto/test_services' +# DebugIsTruncated extends the default Logger to truncate debug messages +class DebugIsTruncated < Logger + def debug(s) + super(truncate(s, 1024)) + end + + # Truncates a given +text+ after a given <tt>length</tt> if +text+ is longer than <tt>length</tt>: + # + # 'Once upon a time in a world far far away'.truncate(27) + # # => "Once upon a time in a wo..." + # + # Pass a string or regexp <tt>:separator</tt> to truncate +text+ at a natural break: + # + # 'Once upon a time in a world far far away'.truncate(27, separator: ' ') + # # => "Once upon a time in a..." + # + # 'Once upon a time in a world far far away'.truncate(27, separator: /\s/) + # # => "Once upon a time in a..." + # + # The last characters will be replaced with the <tt>:omission</tt> string (defaults to "...") + # for a total length not exceeding <tt>length</tt>: + # + # 'And they found that many people were sleeping better.'.truncate(25, omission: '... (continued)') + # # => "And they f... (continued)" + def truncate(s, truncate_at, options = {}) + return s unless s.length > truncate_at + omission = options[:omission] || '...' + with_extra_room = truncate_at - omission.length + stop = \ + if options[:separator] + rindex(options[:separator], with_extra_room) || with_extra_room + else + with_extra_room + end + "#{s[0, stop]}#{omission}" + end +end + +# RubyLogger defines a logger for gRPC based on the standard ruby logger. +module RubyLogger + def logger + LOGGER + end + + LOGGER = DebugIsTruncated.new(STDOUT) + LOGGER.level = Logger::WARN +end + +# GRPC is the general RPC module +module GRPC + # Inject the noop #logger if no module-level logger method has been injected. + extend RubyLogger +end + # loads the certificates by the test server. def load_test_certs this_dir = File.expand_path(File.dirname(__FILE__)) @@ -113,7 +168,7 @@ class TestTarget < Grpc::Testing::TestService::Service def streaming_input_call(call) sizes = call.each_remote_read.map { |x| x.payload.body.length } - sum = sizes.inject { |s, x| s + x } + sum = sizes.inject(0) { |s, x| s + x } StreamingInputCallResponse.new(aggregated_payload_size: sum) end diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb index 9bc82638c7..322566b784 100644 --- a/src/ruby/spec/pb/health/checker_spec.rb +++ b/src/ruby/spec/pb/health/checker_spec.rb @@ -31,6 +31,7 @@ require 'grpc' require 'grpc/health/v1alpha/health' require 'grpc/health/checker' require 'open3' +require 'tmpdir' def can_run_codegen_check system('which grpc_ruby_plugin') && system('which protoc') |