aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2016-11-02 12:06:57 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2016-11-02 12:06:57 -0700
commit1c0b7a2d180f44012b302eb17ce103f6b4d45752 (patch)
tree977457c5f4aac78f1e364d7e64adf5f0e2e8b869 /src/ruby
parent68413c221e341f4f00326e892077c5fcd4b4f788 (diff)
parent11948f74414e6c95b81fbcc2f0d06afa0b1cbce5 (diff)
Merge remote-tracking branch 'upstream/master' into get_tos
Diffstat (limited to 'src/ruby')
-rw-r--r--src/ruby/ext/grpc/rb_call_credentials.c13
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c10
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h15
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb146
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb12
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb43
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb3
-rw-r--r--src/ruby/qps/server.rb4
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb22
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb176
-rw-r--r--src/ruby/spec/generic/rpc_desc_spec.rb24
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb1
-rw-r--r--src/ruby/spec/pb/health/checker_spec.rb38
13 files changed, 357 insertions, 150 deletions
diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c
index 9b6675da84..280f21c973 100644
--- a/src/ruby/ext/grpc/rb_call_credentials.c
+++ b/src/ruby/ext/grpc/rb_call_credentials.c
@@ -86,19 +86,16 @@ static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args,
rb_funcall(exception_object, rb_intern("backtrace"), 0),
rb_intern("join"),
1, rb_str_new2("\n\tfrom "));
- VALUE rb_exception_info = rb_funcall(exception_object, rb_intern("to_s"), 0);
- const char *exception_classname = rb_obj_classname(exception_object);
+ VALUE rb_exception_info = rb_funcall(exception_object, rb_intern("inspect"), 0);
(void)args;
- gpr_log(GPR_INFO, "Call credentials callback failed: %s: %s\n%s",
- exception_classname, StringValueCStr(rb_exception_info),
+ gpr_log(GPR_INFO, "Call credentials callback failed: %s\n%s",
+ StringValueCStr(rb_exception_info),
StringValueCStr(backtrace));
rb_hash_aset(result, rb_str_new2("metadata"), Qnil);
- /* Currently only gives the exception class name. It should be possible get
- more details */
rb_hash_aset(result, rb_str_new2("status"),
- INT2NUM(GRPC_STATUS_PERMISSION_DENIED));
+ INT2NUM(GRPC_STATUS_UNAUTHENTICATED));
rb_hash_aset(result, rb_str_new2("details"),
- rb_str_new2(exception_classname));
+ rb_exception_info);
return result;
}
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index a6cad0db1a..fd73cc7970 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -132,6 +132,11 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
+grpc_resource_quota_create_type grpc_resource_quota_create_import;
+grpc_resource_quota_ref_type grpc_resource_quota_ref_import;
+grpc_resource_quota_unref_type grpc_resource_quota_unref_import;
+grpc_resource_quota_resize_type grpc_resource_quota_resize_import;
+grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import;
grpc_use_signal_type grpc_use_signal_import;
@@ -401,6 +406,11 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
+ grpc_resource_quota_create_import = (grpc_resource_quota_create_type) GetProcAddress(library, "grpc_resource_quota_create");
+ grpc_resource_quota_ref_import = (grpc_resource_quota_ref_type) GetProcAddress(library, "grpc_resource_quota_ref");
+ grpc_resource_quota_unref_import = (grpc_resource_quota_unref_type) GetProcAddress(library, "grpc_resource_quota_unref");
+ grpc_resource_quota_resize_import = (grpc_resource_quota_resize_type) GetProcAddress(library, "grpc_resource_quota_resize");
+ grpc_resource_quota_arg_vtable_import = (grpc_resource_quota_arg_vtable_type) GetProcAddress(library, "grpc_resource_quota_arg_vtable");
grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd");
grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd");
grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 00a67b0b2c..c2244150f2 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -347,6 +347,21 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
+typedef grpc_resource_quota *(*grpc_resource_quota_create_type)(const char *trace_name);
+extern grpc_resource_quota_create_type grpc_resource_quota_create_import;
+#define grpc_resource_quota_create grpc_resource_quota_create_import
+typedef void(*grpc_resource_quota_ref_type)(grpc_resource_quota *resource_quota);
+extern grpc_resource_quota_ref_type grpc_resource_quota_ref_import;
+#define grpc_resource_quota_ref grpc_resource_quota_ref_import
+typedef void(*grpc_resource_quota_unref_type)(grpc_resource_quota *resource_quota);
+extern grpc_resource_quota_unref_type grpc_resource_quota_unref_import;
+#define grpc_resource_quota_unref grpc_resource_quota_unref_import
+typedef void(*grpc_resource_quota_resize_type)(grpc_resource_quota *resource_quota, size_t new_size);
+extern grpc_resource_quota_resize_type grpc_resource_quota_resize_import;
+#define grpc_resource_quota_resize grpc_resource_quota_resize_import
+typedef const grpc_arg_pointer_vtable *(*grpc_resource_quota_arg_vtable_type)(void);
+extern grpc_resource_quota_arg_vtable_type grpc_resource_quota_arg_vtable_import;
+#define grpc_resource_quota_arg_vtable grpc_resource_quota_arg_vtable_import
typedef grpc_channel *(*grpc_insecure_channel_create_from_fd_type)(const char *target, int fd, const grpc_channel_args *args);
extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
#define grpc_insecure_channel_create_from_fd grpc_insecure_channel_create_from_fd_import
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index dfc2644c46..f5c426ebfc 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -43,7 +43,8 @@ class Struct
GRPC.logger.debug("Failing with status #{status}")
# raise BadStatus, propagating the metadata if present.
md = status.metadata
- fail GRPC::BadStatus.new(status.code, status.details, md)
+ fail GRPC::BadStatus.new(status.code, status.details, md),
+ "status code: #{status.code}, details: #{status.details}"
end
status
end
@@ -156,41 +157,25 @@ module GRPC
Operation.new(self)
end
- # writes_done indicates that all writes are completed.
- #
- # It blocks until the remote endpoint acknowledges with at status unless
- # assert_finished is set to false. Any calls to #remote_send after this
- # call will fail.
- #
- # @param assert_finished [true, false] when true(default), waits for
- # FINISHED.
- def writes_done(assert_finished = true)
- ops = {
- SEND_CLOSE_FROM_CLIENT => nil
- }
- ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
- batch_result = @call.run_batch(ops)
- return unless assert_finished
- unless batch_result.status.nil?
- @call.trailing_metadata = batch_result.status.metadata
- end
- @call.status = batch_result.status
- op_is_done
- batch_result.check_status
- end
-
# finished waits until a client call is completed.
#
# It blocks until the remote endpoint acknowledges by sending a status.
def finished
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
- unless batch_result.status.nil?
- @call.trailing_metadata = batch_result.status.metadata
+ attach_status_results_and_complete_call(batch_result)
+ end
+
+ def attach_status_results_and_complete_call(recv_status_batch_result)
+ unless recv_status_batch_result.status.nil?
+ @call.trailing_metadata = recv_status_batch_result.status.metadata
end
- @call.status = batch_result.status
- op_is_done
- batch_result.check_status
+ @call.status = recv_status_batch_result.status
@call.close
+ op_is_done
+
+ # The RECV_STATUS in run_batch always succeeds
+ # Check the status for a bad status or failed run batch
+ recv_status_batch_result.check_status
end
# remote_send sends a request to the remote endpoint.
@@ -226,6 +211,23 @@ module GRPC
nil
end
+ def server_unary_response(req, trailing_metadata: {},
+ code: Core::StatusCodes::OK, details: 'OK')
+ ops = {}
+ @send_initial_md_mutex.synchronize do
+ ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent
+ @metadata_sent = true
+ end
+
+ payload = @marshal.call(req)
+ ops[SEND_MESSAGE] = payload
+ ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new(
+ code, details, trailing_metadata)
+ ops[RECV_CLOSE_ON_SERVER] = nil
+
+ @call.run_batch(ops)
+ end
+
# remote_read reads a response from the remote endpoint.
#
# It blocks until the remote endpoint replies with a message or status.
@@ -240,9 +242,13 @@ module GRPC
@call.metadata = batch_result.metadata
@metadata_received = true
end
- unless batch_result.nil? || batch_result.message.nil?
- res = @unmarshal.call(batch_result.message)
- return res
+ get_message_from_batch_result(batch_result)
+ end
+
+ def get_message_from_batch_result(recv_message_batch_result)
+ unless recv_message_batch_result.nil? ||
+ recv_message_batch_result.message.nil?
+ return @unmarshal.call(recv_message_batch_result.message)
end
GRPC.logger.debug('found nil; the final response has been sent')
nil
@@ -298,7 +304,6 @@ module GRPC
return enum_for(:each_remote_read_then_finish) unless block_given?
loop do
resp = remote_read
- break if resp.is_a? Struct::Status # is an OK status
if resp.nil? # the last response was received, but not finished yet
finished
break
@@ -315,15 +320,25 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def request_response(req, metadata: {})
- merge_metadata_to_send(metadata) && send_initial_metadata
- remote_send(req)
- writes_done(false)
- response = remote_read
- finished unless response.is_a? Struct::Status
- response
- rescue GRPC::Core::CallError => e
- finished # checks for Cancelled
- raise e
+ ops = {
+ SEND_MESSAGE => @marshal.call(req),
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_INITIAL_METADATA => nil,
+ RECV_MESSAGE => nil,
+ RECV_STATUS_ON_CLIENT => nil
+ }
+ @send_initial_md_mutex.synchronize do
+ # Metadata might have already been sent if this is an operation view
+ unless @metadata_sent
+ ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
+ end
+ @metadata_sent = true
+ end
+ batch_result = @call.run_batch(ops)
+
+ @call.metadata = batch_result.metadata
+ attach_status_results_and_complete_call(batch_result)
+ get_message_from_batch_result(batch_result)
end
# client_streamer sends a stream of requests to a GRPC server, and
@@ -339,12 +354,20 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def client_streamer(requests, metadata: {})
- merge_metadata_to_send(metadata) && send_initial_metadata
- requests.each { |r| remote_send(r) }
- writes_done(false)
- response = remote_read
- finished unless response.is_a? Struct::Status
- response
+ # Metadata might have already been sent if this is an operation view
+ merge_metadata_and_send_if_not_already_sent(metadata)
+
+ requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
+ batch_result = @call.run_batch(
+ SEND_CLOSE_FROM_CLIENT => nil,
+ RECV_INITIAL_METADATA => nil,
+ RECV_MESSAGE => nil,
+ RECV_STATUS_ON_CLIENT => nil
+ )
+
+ @call.metadata = batch_result.metadata
+ attach_status_results_and_complete_call(batch_result)
+ get_message_from_batch_result(batch_result)
rescue GRPC::Core::CallError => e
finished # checks for Cancelled
raise e
@@ -365,9 +388,18 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {})
- merge_metadata_to_send(metadata) && send_initial_metadata
- remote_send(req)
- writes_done(false)
+ ops = {
+ SEND_MESSAGE => @marshal.call(req),
+ SEND_CLOSE_FROM_CLIENT => nil
+ }
+ @send_initial_md_mutex.synchronize do
+ # Metadata might have already been sent if this is an operation view
+ unless @metadata_sent
+ ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
+ end
+ @metadata_sent = true
+ end
+ @call.run_batch(ops)
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
@@ -404,7 +436,8 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk)
- merge_metadata_to_send(metadata) && send_initial_metadata
+ # Metadata might have already been sent if this is an operation view
+ merge_metadata_and_send_if_not_already_sent(metadata)
bd = BidiCall.new(@call,
@marshal,
@unmarshal,
@@ -457,6 +490,15 @@ module GRPC
end
end
+ def merge_metadata_and_send_if_not_already_sent(new_metadata = {})
+ @send_initial_md_mutex.synchronize do
+ return if @metadata_sent
+ @metadata_to_send.merge!(new_metadata)
+ @call.run_batch(SEND_INITIAL_METADATA => @metadata_to_send)
+ @metadata_sent = true
+ end
+ end
+
private
# Starts the call if not already started
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 0d7c1f7805..6934257cbc 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -168,6 +168,7 @@ module GRPC
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #request_response.
+ c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
c.request_response(req, metadata: metadata)
@@ -231,9 +232,10 @@ module GRPC
# return the operation view of the active_call; define #execute as a
# new method for this instance that invokes #client_streamer.
+ c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
- c.client_streamer(requests, metadata: metadata)
+ c.client_streamer(requests)
end
op
end
@@ -309,9 +311,10 @@ module GRPC
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #server_streamer
+ c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
- c.server_streamer(req, metadata: metadata, &blk)
+ c.server_streamer(req, &blk)
end
op
end
@@ -417,15 +420,15 @@ module GRPC
deadline: deadline,
parent: parent,
credentials: credentials)
-
return c.bidi_streamer(requests, metadata: metadata,
&blk) unless return_op
# return the operation view of the active_call; define #execute
# as a new method for this instance that invokes #bidi_streamer
+ c.merge_metadata_to_send(metadata)
op = c.operation
op.define_singleton_method(:execute) do
- c.bidi_streamer(requests, metadata: metadata, &blk)
+ c.bidi_streamer(requests, &blk)
end
op
end
@@ -445,7 +448,6 @@ module GRPC
deadline: nil,
parent: nil,
credentials: nil)
-
deadline = from_relative_time(@timeout) if deadline.nil?
# Provide each new client call with its own completion queue
call = @ch.create_call(parent, # parent call
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 584fe78169..cd17aed8e7 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -62,25 +62,44 @@ module GRPC
proc { |o| unmarshal_class.method(unmarshal_method).call(o) }
end
+ def handle_request_response(active_call, mth)
+ req = active_call.remote_read
+ resp = mth.call(req, active_call.single_req_view)
+ active_call.server_unary_response(
+ resp, trailing_metadata: active_call.output_metadata)
+ end
+
+ def handle_client_streamer(active_call, mth)
+ resp = mth.call(active_call.multi_req_view)
+ active_call.server_unary_response(
+ resp, trailing_metadata: active_call.output_metadata)
+ end
+
+ def handle_server_streamer(active_call, mth)
+ req = active_call.remote_read
+ replys = mth.call(req, active_call.single_req_view)
+ replys.each { |r| active_call.remote_send(r) }
+ send_status(active_call, OK, 'OK', active_call.output_metadata)
+ end
+
+ def handle_bidi_streamer(active_call, mth)
+ active_call.run_server_bidi(mth)
+ send_status(active_call, OK, 'OK', active_call.output_metadata)
+ end
+
def run_server_method(active_call, mth)
# While a server method is running, it might be cancelled, its deadline
# might be reached, the handler could throw an unknown error, or a
# well-behaved handler could throw a StatusError.
if request_response?
- req = active_call.remote_read
- resp = mth.call(req, active_call.single_req_view)
- active_call.remote_send(resp)
+ handle_request_response(active_call, mth)
elsif client_streamer?
- resp = mth.call(active_call.multi_req_view)
- active_call.remote_send(resp)
+ handle_client_streamer(active_call, mth)
elsif server_streamer?
- req = active_call.remote_read
- replys = mth.call(req, active_call.single_req_view)
- replys.each { |r| active_call.remote_send(r) }
+ handle_server_streamer(active_call, mth)
else # is a bidi_stream
- active_call.run_server_bidi(mth)
+ handle_bidi_streamer(active_call, mth)
end
- send_status(active_call, OK, 'OK', active_call.output_metadata)
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application error
# code and detail message and some additional app-specific metadata.
@@ -91,7 +110,7 @@ module GRPC
# Log it, but don't notify the other endpoint..
GRPC.logger.warn("failed call: #{active_call}\n#{e}")
rescue Core::OutOfTime
- # This is raised when active_call#method.call exceeeds the deadline
+ # This is raised when active_call#method.call exceeds the deadline
# event. Send a status of deadline exceeded
GRPC.logger.warn("late call: #{active_call}")
send_status(active_call, DEADLINE_EXCEEDED, 'late')
@@ -100,7 +119,7 @@ module GRPC
# Send back a UNKNOWN status to the client
GRPC.logger.warn("failed handler: #{active_call}; sending status:UNKNOWN")
GRPC.logger.warn(e)
- send_status(active_call, UNKNOWN, 'no reason given')
+ send_status(active_call, UNKNOWN, 'unkown error handling call on server')
end
def assert_arity_matches(mth)
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 7dbcb7d479..57f99c8ce6 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -54,6 +54,7 @@ module GRPC
DEFAULT_MAX_WAITING_REQUESTS = 60
# Default poll period is 1s
+ # Used for grpc server shutdown and thread pool shutdown timeouts
DEFAULT_POLL_PERIOD = 1
# Signal check period is 0.25s
@@ -127,7 +128,7 @@ module GRPC
deadline = from_relative_time(@poll_period)
@server.close(deadline)
@pool.shutdown
- @pool.wait_for_termination
+ @pool.wait_for_termination(@poll_period)
end
def running_state
diff --git a/src/ruby/qps/server.rb b/src/ruby/qps/server.rb
index d0c2073dd1..6175855cd9 100644
--- a/src/ruby/qps/server.rb
+++ b/src/ruby/qps/server.rb
@@ -63,7 +63,9 @@ class BenchmarkServer
cred = :this_port_is_insecure
end
# Make sure server can handle the large number of calls in benchmarks
- @server = GRPC::RpcServer.new(pool_size: 100, max_waiting_requests: 100)
+ # TODO: @apolcyn, if scenario config increases total outstanding
+ # calls then will need to increase the pool size too
+ @server = GRPC::RpcServer.new(pool_size: 1024, max_waiting_requests: 1024)
@port = @server.add_http2_port("0.0.0.0:" + port.to_s, cred)
@server.handle(BenchmarkServiceImpl.new)
@start_time = Time.now
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 5ae4f25537..aa51d9d7b1 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -402,7 +402,7 @@ describe GRPC::ActiveCall do
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
- client_call.writes_done(false)
+ call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
server_call.send_status(OK, 'OK')
@@ -460,7 +460,7 @@ describe GRPC::ActiveCall do
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
- client_call.writes_done(false)
+ call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
server_call = expect_server_to_receive(msg)
e = client_call.each_remote_read
n = 3 # arbitrary value > 1
@@ -473,7 +473,7 @@ describe GRPC::ActiveCall do
end
end
- describe '#writes_done' do
+ describe '#closing the call from the client' do
it 'finishes ok if the server sends a status response' do
call = make_test_call
ActiveCall.client_invoke(call)
@@ -481,7 +481,9 @@ describe GRPC::ActiveCall do
@pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
- expect { client_call.writes_done(false) }.to_not raise_error
+ expect do
+ call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
+ end.to_not raise_error
server_call = expect_server_to_receive(msg)
server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('server_response')
@@ -500,11 +502,13 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response')
- expect { client_call.writes_done(false) }.to_not raise_error
+ expect do
+ call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
+ end.to_not raise_error
expect { client_call.finished }.to_not raise_error
end
- it 'finishes ok if writes_done is true' do
+ it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
call = make_test_call
ActiveCall.client_invoke(call)
client_call = ActiveCall.new(call, @pass_through,
@@ -515,7 +519,11 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
server_call.send_status(OK, 'status code is OK')
expect(client_call.remote_read).to eq('server_response')
- expect { client_call.writes_done(true) }.to_not raise_error
+ expect do
+ call.run_batch(
+ CallOps::SEND_CLOSE_FROM_CLIENT => nil,
+ CallOps::RECV_STATUS_ON_CLIENT => nil)
+ end.to_not raise_error
end
end
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 6034b5419c..607a4a3c5d 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -168,42 +168,94 @@ describe 'ClientStub' do
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
end
+
+ it 'should receive UNAUTHENTICATED if call credentials plugin fails' do
+ server_port = create_secure_test_server
+ th = run_request_response(@sent_msg, @resp, @pass)
+
+ certs = load_test_certs
+ secure_channel_creds = GRPC::Core::ChannelCredentials.new(
+ certs[0], nil, nil)
+ secure_stub_opts = {
+ channel_args: {
+ GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
+ }
+ }
+ stub = GRPC::ClientStub.new("localhost:#{server_port}",
+ secure_channel_creds, **secure_stub_opts)
+
+ error_message = 'Failing call credentials callback'
+ failing_auth = proc do
+ fail error_message
+ end
+ creds = GRPC::Core::CallCredentials.new(failing_auth)
+
+ error_occured = false
+ begin
+ get_response(stub, credentials: creds)
+ rescue GRPC::BadStatus => e
+ error_occured = true
+ expect(e.code).to eq(GRPC::Core::StatusCodes::UNAUTHENTICATED)
+ expect(e.details.include?(error_message)).to be true
+ end
+ expect(error_occured).to eq(true)
+
+ # Kill the server thread so tests can complete
+ th.kill
+ end
end
describe 'without a call operation' do
- def get_response(stub)
+ def get_response(stub, credentials: nil)
+ puts credentials.inspect
stub.request_response(@method, @sent_msg, noop, noop,
- metadata: { k1: 'v1', k2: 'v2' })
+ metadata: { k1: 'v1', k2: 'v2' },
+ credentials: credentials)
end
it_behaves_like 'request response'
end
describe 'via a call operation' do
- def get_response(stub)
+ def get_response(stub, run_start_call_first: false, credentials: nil)
op = stub.request_response(@method, @sent_msg, noop, noop,
return_op: true,
metadata: { k1: 'v1', k2: 'v2' },
- deadline: from_relative_time(2))
+ deadline: from_relative_time(2),
+ credentials: credentials)
expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.execute
+ op.start_call if run_start_call_first
+ result = op.execute
+ op.wait # make sure wait doesn't hang
+ result
end
it_behaves_like 'request response'
- end
- end
- describe '#client_streamer' do
- shared_examples 'client streaming' do
- before(:each) do
+ it 'sends metadata to the server ok when running start_call first' do
server_port = create_test_server
host = "localhost:#{server_port}"
- @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- @metadata = { k1: 'v1', k2: 'v2' }
- @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
- @resp = 'a_reply'
+ th = run_request_response(@sent_msg, @resp, @pass,
+ k1: 'v1', k2: 'v2')
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ expect(get_response(stub)).to eq(@resp)
+ th.join
end
+ end
+ end
+
+ describe '#client_streamer' do
+ before(:each) do
+ Thread.abort_on_exception = true
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ @metadata = { k1: 'v1', k2: 'v2' }
+ @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
+ @resp = 'a_reply'
+ end
+ shared_examples 'client streaming' do
it 'should send requests to/receive a reply from a server' do
th = run_client_streamer(@sent_msgs, @resp, @pass)
expect(get_response(@stub)).to eq(@resp)
@@ -242,24 +294,33 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
- def get_response(stub)
+ def get_response(stub, run_start_call_first: false)
op = stub.client_streamer(@method, @sent_msgs, noop, noop,
return_op: true, metadata: @metadata)
expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.execute
+ op.start_call if run_start_call_first
+ result = op.execute
+ op.wait # make sure wait doesn't hang
+ result
end
it_behaves_like 'client streaming'
+
+ it 'sends metadata to the server ok when running start_call first' do
+ th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
+ expect(get_response(@stub, run_start_call_first: true)).to eq(@resp)
+ th.join
+ end
end
end
describe '#server_streamer' do
- shared_examples 'server streaming' do
- before(:each) do
- @sent_msg = 'a_msg'
- @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
- end
+ before(:each) do
+ @sent_msg = 'a_msg'
+ @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
+ end
+ shared_examples 'server streaming' do
it 'should send a request to/receive replies from a server' do
server_port = create_test_server
host = "localhost:#{server_port}"
@@ -303,29 +364,44 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
- def get_responses(stub)
- op = stub.server_streamer(@method, @sent_msg, noop, noop,
- return_op: true,
- metadata: { k1: 'v1', k2: 'v2' })
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- e = op.execute
+ after(:each) do
+ @op.wait # make sure wait doesn't hang
+ end
+ def get_responses(stub, run_start_call_first: false)
+ @op = stub.server_streamer(@method, @sent_msg, noop, noop,
+ return_op: true,
+ metadata: { k1: 'v1', k2: 'v2' })
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ e = @op.execute
expect(e).to be_a(Enumerator)
e
end
it_behaves_like 'server streaming'
+
+ it 'should send metadata to the server ok when start_call is run first' do
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ th = run_server_streamer(@sent_msg, @replys, @fail,
+ k1: 'v1', k2: 'v2')
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ e = get_responses(stub, run_start_call_first: true)
+ expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+ th.join
+ end
end
end
describe '#bidi_streamer' do
- shared_examples 'bidi streaming' do
- before(:each) do
- @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
- @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
- server_port = create_test_server
- @host = "localhost:#{server_port}"
- end
+ before(:each) do
+ @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
+ @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
+ server_port = create_test_server
+ @host = "localhost:#{server_port}"
+ end
+ shared_examples 'bidi streaming' do
it 'supports sending all the requests first', bidi: true do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
@@ -363,16 +439,29 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
- def get_responses(stub)
- op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
- return_op: true)
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- e = op.execute
+ after(:each) do
+ @op.wait # make sure wait doesn't hang
+ end
+ def get_responses(stub, run_start_call_first: false)
+ @op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+ return_op: true)
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ e = @op.execute
expect(e).to be_a(Enumerator)
e
end
it_behaves_like 'bidi streaming'
+
+ it 'can run start_call before executing the call' do
+ th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
+ @pass)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ e = get_responses(stub, run_start_call_first: true)
+ expect(e.collect { |r| r }).to eq(@replys)
+ th.join
+ end
end
end
@@ -441,6 +530,15 @@ describe 'ClientStub' do
end
end
+ def create_secure_test_server
+ certs = load_test_certs
+ secure_credentials = GRPC::Core::ServerCredentials.new(
+ nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
+
+ @server = GRPC::Core::Server.new(nil)
+ @server.add_http2_port('0.0.0.0:0', secure_credentials)
+ end
+
def create_test_server
@server = GRPC::Core::Server.new(nil)
@server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 1a895005bc..a3f0efa603 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -48,7 +48,7 @@ describe GRPC::RpcDesc do
@bidi_streamer = RpcDesc.new('ss', Stream.new(Object.new),
Stream.new(Object.new), 'encode', 'decode')
@bs_code = INTERNAL
- @no_reason = 'no reason given'
+ @no_reason = 'unkown error handling call on server'
@ok_response = Object.new
end
@@ -83,6 +83,7 @@ describe GRPC::RpcDesc do
before(:each) do
@call = double('active_call')
allow(@call).to receive(:single_req_view).and_return(@call)
+ allow(@call).to receive(:output_metadata).and_return(@call)
end
it_behaves_like 'it handles errors'
@@ -90,10 +91,10 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
req = Object.new
expect(@call).to receive(:remote_read).once.and_return(req)
- expect(@call).to receive(:remote_send).once.with(@ok_response)
- expect(@call).to receive(:output_metadata).and_return(fake_md)
- expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
- metadata: fake_md)
+ expect(@call).to receive(:output_metadata).once.and_return(fake_md)
+ expect(@call).to receive(:server_unary_response).once
+ .with(@ok_response, trailing_metadata: fake_md)
+
this_desc.run_server_method(@call, method(:fake_reqresp))
end
end
@@ -117,7 +118,9 @@ describe GRPC::RpcDesc do
end
it 'absorbs CallError with no further action' do
- expect(@call).to receive(:remote_send).once.and_raise(CallError)
+ expect(@call).to receive(:server_unary_response).once.and_raise(
+ CallError)
+ allow(@call).to receive(:output_metadata).and_return({})
blk = proc do
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
@@ -125,10 +128,11 @@ describe GRPC::RpcDesc do
end
it 'sends a response and closes the stream if there no errors' do
- expect(@call).to receive(:remote_send).once.with(@ok_response)
- expect(@call).to receive(:output_metadata).and_return(fake_md)
- expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
- metadata: fake_md)
+ expect(@call).to receive(:output_metadata).and_return(
+ fake_md)
+ expect(@call).to receive(:server_unary_response).once
+ .with(@ok_response, trailing_metadata: fake_md)
+
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index d362e48dee..c5694790fd 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -462,6 +462,7 @@ describe GRPC::RpcServer do
'connect_k1' => 'connect_v1'
}
wanted_md.each do |key, value|
+ puts "key: #{key}"
expect(op.metadata[key]).to eq(value)
end
@srv.stop
diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb
index 1b2fa96827..4711e09e88 100644
--- a/src/ruby/spec/pb/health/checker_spec.rb
+++ b/src/ruby/spec/pb/health/checker_spec.rb
@@ -97,15 +97,17 @@ describe Grpc::Health::Checker do
context 'initialization' do
it 'can be constructed with no args' do
- expect(subject).to_not be(nil)
+ checker = Grpc::Health::Checker.new
+ expect(checker).to_not be(nil)
end
end
context 'method `add_status` and `check`' do
success_tests.each do |t|
it "should succeed when #{t[:desc]}" do
- subject.add_status(t[:service], ServingStatus::NOT_SERVING)
- got = subject.check(HCReq.new(service: t[:service]), nil)
+ checker = Grpc::Health::Checker.new
+ checker.add_status(t[:service], ServingStatus::NOT_SERVING)
+ got = checker.check(HCReq.new(service: t[:service]), nil)
want = HCResp.new(status: ServingStatus::NOT_SERVING)
expect(got).to eq(want)
end
@@ -115,8 +117,9 @@ describe Grpc::Health::Checker do
context 'method `check`' do
success_tests.each do |t|
it "should fail with NOT_FOUND when #{t[:desc]}" do
+ checker = Grpc::Health::Checker.new
blk = proc do
- subject.check(HCReq.new(service: t[:service]), nil)
+ checker.check(HCReq.new(service: t[:service]), nil)
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
@@ -127,14 +130,15 @@ describe Grpc::Health::Checker do
context 'method `clear_status`' do
success_tests.each do |t|
it "should fail after clearing status when #{t[:desc]}" do
- subject.add_status(t[:service], ServingStatus::NOT_SERVING)
- got = subject.check(HCReq.new(service: t[:service]), nil)
+ checker = Grpc::Health::Checker.new
+ checker.add_status(t[:service], ServingStatus::NOT_SERVING)
+ got = checker.check(HCReq.new(service: t[:service]), nil)
want = HCResp.new(status: ServingStatus::NOT_SERVING)
expect(got).to eq(want)
- subject.clear_status(t[:service])
+ checker.clear_status(t[:service])
blk = proc do
- subject.check(HCReq.new(service: t[:service]), nil)
+ checker.check(HCReq.new(service: t[:service]), nil)
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
@@ -144,18 +148,19 @@ describe Grpc::Health::Checker do
context 'method `clear_all`' do
it 'should return NOT_FOUND after being invoked' do
+ checker = Grpc::Health::Checker.new
success_tests.each do |t|
- subject.add_status(t[:service], ServingStatus::NOT_SERVING)
- got = subject.check(HCReq.new(service: t[:service]), nil)
+ checker.add_status(t[:service], ServingStatus::NOT_SERVING)
+ got = checker.check(HCReq.new(service: t[:service]), nil)
want = HCResp.new(status: ServingStatus::NOT_SERVING)
expect(got).to eq(want)
end
- subject.clear_all
+ checker.clear_all
success_tests.each do |t|
blk = proc do
- subject.check(HCReq.new(service: t[:service]), nil)
+ checker.check(HCReq.new(service: t[:service]), nil)
end
expected_msg = /#{StatusCodes::NOT_FOUND}/
expect(&blk).to raise_error GRPC::BadStatus, expected_msg
@@ -184,8 +189,10 @@ describe Grpc::Health::Checker do
end
it 'should receive the correct status', server: true do
- @srv.handle(subject)
- subject.add_status('', ServingStatus::NOT_SERVING)
+ Thread.abort_on_exception = true
+ checker = Grpc::Health::Checker.new
+ @srv.handle(checker)
+ checker.add_status('', ServingStatus::NOT_SERVING)
t = Thread.new { @srv.run }
@srv.wait_till_running
@@ -198,7 +205,8 @@ describe Grpc::Health::Checker do
end
it 'should fail on unknown services', server: true do
- @srv.handle(subject)
+ checker = Grpc::Health::Checker.new
+ @srv.handle(checker)
t = Thread.new { @srv.run }
@srv.wait_till_running
blk = proc do