aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby')
-rw-r--r--src/ruby/README.md2
-rw-r--r--src/ruby/ext/grpc/rb_call.c33
-rw-r--r--src/ruby/ext/grpc/rb_call.h3
-rw-r--r--src/ruby/ext/grpc/rb_call_credentials.c7
-rw-r--r--src/ruby/ext/grpc/rb_channel_credentials.c6
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb172
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb62
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb4
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb1
-rw-r--r--src/ruby/spec/client_auth_spec.rb137
-rw-r--r--src/ruby/spec/client_server_spec.rb34
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb4
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb404
-rw-r--r--src/ruby/spec/generic/rpc_desc_spec.rb10
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb145
-rw-r--r--src/ruby/spec/testdata/client.key16
-rw-r--r--src/ruby/spec/testdata/client.pem14
17 files changed, 890 insertions, 164 deletions
diff --git a/src/ruby/README.md b/src/ruby/README.md
index 266d6b2c16..5c7dae654a 100644
--- a/src/ruby/README.md
+++ b/src/ruby/README.md
@@ -68,5 +68,5 @@ Directory structure is the layout for [ruby extensions][]
[ruby extensions]:http://guides.rubygems.org/gems-with-extensions/
[rubydoc]: http://www.rubydoc.info/gems/grpc
-[grpc.io]: http://www.grpc.io/docs/quickstart/ruby.html
+[grpc.io]: https://grpc.io/docs/quickstart/ruby.html
[Debian jessie-backports]:http://backports.debian.org/Instructions/
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 67425a68e6..b99954883f 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -24,6 +24,8 @@
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "rb_byte_buffer.h"
#include "rb_call_credentials.h"
@@ -368,7 +370,7 @@ static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) {
to fill grpc_metadata_array.
it's capacity should have been computed via a prior call to
- grpc_rb_md_ary_fill_hash_cb
+ grpc_rb_md_ary_capacity_hash_cb
*/
static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
grpc_metadata_array *md_ary = NULL;
@@ -376,7 +378,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
long i;
grpc_slice key_slice;
grpc_slice value_slice;
- char *tmp_str;
+ char *tmp_str = NULL;
if (TYPE(key) == T_SYMBOL) {
key_slice = grpc_slice_from_static_string(rb_id2name(SYM2ID(key)));
@@ -386,6 +388,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
} else {
rb_raise(rb_eTypeError,
"grpc_rb_md_ary_fill_hash_cb: bad type for key parameter");
+ return ST_STOP;
}
if (!grpc_header_key_is_legal(key_slice)) {
@@ -413,6 +416,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
tmp_str);
return ST_STOP;
}
+ GPR_ASSERT(md_ary->count < md_ary->capacity);
md_ary->metadata[md_ary->count].key = key_slice;
md_ary->metadata[md_ary->count].value = value_slice;
md_ary->count += 1;
@@ -428,6 +432,7 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
tmp_str);
return ST_STOP;
}
+ GPR_ASSERT(md_ary->count < md_ary->capacity);
md_ary->metadata[md_ary->count].key = key_slice;
md_ary->metadata[md_ary->count].value = value_slice;
md_ary->count += 1;
@@ -435,7 +440,6 @@ static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
rb_raise(rb_eArgError, "Header values must be of type string or array");
return ST_STOP;
}
-
return ST_CONTINUE;
}
@@ -458,6 +462,7 @@ static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
} else {
md_ary->capacity += 1;
}
+
return ST_CONTINUE;
}
@@ -480,7 +485,7 @@ void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary) {
md_ary_obj =
TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
- md_ary->metadata = gpr_malloc(md_ary->capacity * sizeof(grpc_metadata));
+ md_ary->metadata = gpr_zalloc(md_ary->capacity * sizeof(grpc_metadata));
rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
}
@@ -611,13 +616,25 @@ static void grpc_run_batch_stack_init(run_batch_stack *st,
st->write_flag = write_flag;
}
+void grpc_rb_metadata_array_destroy_including_entries(
+ grpc_metadata_array *array) {
+ size_t i;
+ if (array->metadata) {
+ for (i = 0; i < array->count; i++) {
+ grpc_slice_unref(array->metadata[i].key);
+ grpc_slice_unref(array->metadata[i].value);
+ }
+ }
+ grpc_metadata_array_destroy(array);
+}
+
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
* cleaned up */
static void grpc_run_batch_stack_cleanup(run_batch_stack *st) {
size_t i = 0;
- grpc_metadata_array_destroy(&st->send_metadata);
- grpc_metadata_array_destroy(&st->send_trailing_metadata);
+ grpc_rb_metadata_array_destroy_including_entries(&st->send_metadata);
+ grpc_rb_metadata_array_destroy_including_entries(&st->send_trailing_metadata);
grpc_metadata_array_destroy(&st->recv_metadata);
grpc_metadata_array_destroy(&st->recv_trailing_metadata);
@@ -658,8 +675,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
st->ops[st->op_num].flags = 0;
switch (NUM2INT(this_op)) {
case GRPC_OP_SEND_INITIAL_METADATA:
- /* N.B. later there is no need to explicitly delete the metadata keys
- * and values, they are references to data in ruby objects. */
grpc_rb_md_ary_convert(this_value, &st->send_metadata);
st->ops[st->op_num].data.send_initial_metadata.count =
st->send_metadata.count;
@@ -675,8 +690,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
case GRPC_OP_SEND_STATUS_FROM_SERVER:
- /* N.B. later there is no need to explicitly delete the metadata keys
- * and values, they are references to data in ruby objects. */
grpc_rb_op_update_status_from_server(
&st->ops[st->op_num], &st->send_trailing_metadata,
&st->send_status_details, this_value);
diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h
index be791cf972..bfe8035e0f 100644
--- a/src/ruby/ext/grpc/rb_call.h
+++ b/src/ruby/ext/grpc/rb_call.h
@@ -40,6 +40,9 @@ VALUE grpc_rb_md_ary_to_h(grpc_metadata_array *md_ary);
*/
void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array *md_ary);
+void grpc_rb_metadata_array_destroy_including_entries(
+ grpc_metadata_array *md_ary);
+
/* grpc_rb_eCallError is the ruby class of the exception thrown during call
operations. */
extern VALUE grpc_rb_eCallError;
diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c
index 95e01a2c19..049a869bdc 100644
--- a/src/ruby/ext/grpc/rb_call_credentials.c
+++ b/src/ruby/ext/grpc/rb_call_credentials.c
@@ -108,7 +108,7 @@ static void grpc_rb_call_credentials_callback_with_gil(void *param) {
error_details = StringValueCStr(details);
params->callback(params->user_data, md_ary.metadata, md_ary.count, status,
error_details);
- grpc_metadata_array_destroy(&md_ary);
+ grpc_rb_metadata_array_destroy_including_entries(&md_ary);
gpr_free(params);
}
@@ -239,6 +239,7 @@ static VALUE grpc_rb_call_credentials_compose(int argc, VALUE *argv,
VALUE self) {
grpc_call_credentials *creds;
grpc_call_credentials *other;
+ grpc_call_credentials *prev = NULL;
VALUE mark;
if (argc == 0) {
return self;
@@ -249,6 +250,10 @@ static VALUE grpc_rb_call_credentials_compose(int argc, VALUE *argv,
rb_ary_push(mark, argv[i]);
other = grpc_rb_get_wrapped_call_credentials(argv[i]);
creds = grpc_composite_call_credentials_create(creds, other, NULL);
+ if (prev != NULL) {
+ grpc_call_credentials_release(prev);
+ }
+ prev = creds;
}
return grpc_rb_wrap_call_credentials(creds, mark);
}
diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c
index 07935a13dc..83601ca694 100644
--- a/src/ruby/ext/grpc/rb_channel_credentials.c
+++ b/src/ruby/ext/grpc/rb_channel_credentials.c
@@ -184,6 +184,7 @@ static VALUE grpc_rb_channel_credentials_compose(int argc, VALUE *argv,
VALUE self) {
grpc_channel_credentials *creds;
grpc_call_credentials *other;
+ grpc_channel_credentials *prev = NULL;
VALUE mark;
if (argc == 0) {
return self;
@@ -195,6 +196,11 @@ static VALUE grpc_rb_channel_credentials_compose(int argc, VALUE *argv,
rb_ary_push(mark, argv[i]);
other = grpc_rb_get_wrapped_call_credentials(argv[i]);
creds = grpc_composite_channel_credentials_create(creds, other, NULL);
+ if (prev != NULL) {
+ grpc_channel_credentials_release(prev);
+ }
+ prev = creds;
+
if (creds == NULL) {
rb_raise(rb_eRuntimeError,
"Failed to compose channel and call credentials");
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index cb407d236d..87b29c26ea 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -40,13 +40,13 @@ end
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
- class ActiveCall
+ class ActiveCall # rubocop:disable Metrics/ClassLength
include Core::TimeConsts
include Core::CallOps
extend Forwardable
- attr_reader :deadline, :metadata_sent, :metadata_to_send
+ attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
- :peer, :peer_cert, :trailing_metadata
+ :trailing_metadata, :status
# client_invoke begins a client invocation.
#
@@ -100,6 +100,18 @@ module GRPC
fail(ArgumentError, 'Already sent md') if started && metadata_to_send
@metadata_to_send = metadata_to_send || {} unless started
@send_initial_md_mutex = Mutex.new
+
+ @output_stream_done = false
+ @input_stream_done = false
+ @call_finished = false
+ @call_finished_mu = Mutex.new
+
+ @client_call_executed = false
+ @client_call_executed_mu = Mutex.new
+
+ # set the peer now so that the accessor can still function
+ # after the server closes the call
+ @peer = call.peer
end
# Sends the initial metadata that has yet to be sent.
@@ -142,11 +154,9 @@ module GRPC
Operation.new(self)
end
- # finished waits until a client call is completed.
- #
- # It blocks until the remote endpoint acknowledges by sending a status.
- def finished
+ def receive_and_check_status
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+ set_input_stream_done
attach_status_results_and_complete_call(batch_result)
end
@@ -155,8 +165,6 @@ module GRPC
@call.trailing_metadata = recv_status_batch_result.status.metadata
end
@call.status = recv_status_batch_result.status
- @call.close
- op_is_done
# The RECV_STATUS in run_batch always succeeds
# Check the status for a bad status or failed run batch
@@ -193,9 +201,19 @@ module GRPC
}
ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
@call.run_batch(ops)
+ set_output_stream_done
+
nil
end
+ # Intended for use on server-side calls when a single request from
+ # the client is expected (i.e., unary and server-streaming RPC types).
+ def read_unary_request
+ req = remote_read
+ set_input_stream_done
+ req
+ end
+
def server_unary_response(req, trailing_metadata: {},
code: Core::StatusCodes::OK, details: 'OK')
ops = {}
@@ -211,6 +229,7 @@ module GRPC
ops[RECV_CLOSE_ON_SERVER] = nil
@call.run_batch(ops)
+ set_output_stream_done
end
# remote_read reads a response from the remote endpoint.
@@ -241,6 +260,8 @@ module GRPC
# each_remote_read passes each response to the given block or returns an
# enumerator the responses if no block is given.
+ # Used to generate the request enumerable for
+ # server-side client-streaming RPC's.
#
# == Enumerator ==
#
@@ -258,10 +279,14 @@ module GRPC
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) unless block_given?
- loop do
- resp = remote_read
- break if resp.nil? # the last response was received
- yield resp
+ begin
+ loop do
+ resp = remote_read
+ break if resp.nil? # the last response was received
+ yield resp
+ end
+ ensure
+ set_input_stream_done
end
end
@@ -287,13 +312,17 @@ module GRPC
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
- loop do
- resp = remote_read
- if resp.nil? # the last response was received, but not finished yet
- finished
- break
+ begin
+ loop do
+ resp = remote_read
+ if resp.nil? # the last response was received
+ receive_and_check_status
+ break
+ end
+ yield resp
end
- yield resp
+ ensure
+ set_input_stream_done
end
end
@@ -305,6 +334,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def request_response(req, metadata: {})
+ raise_error_if_already_executed
ops = {
SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil,
@@ -319,7 +349,15 @@ module GRPC
end
@metadata_sent = true
end
- batch_result = @call.run_batch(ops)
+
+ begin
+ batch_result = @call.run_batch(ops)
+ # no need to check for cancellation after a CallError because this
+ # batch contains a RECV_STATUS op
+ ensure
+ set_input_stream_done
+ set_output_stream_done
+ end
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
@@ -339,10 +377,20 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def client_streamer(requests, metadata: {})
- # Metadata might have already been sent if this is an operation view
- merge_metadata_and_send_if_not_already_sent(metadata)
+ raise_error_if_already_executed
+ begin
+ merge_metadata_and_send_if_not_already_sent(metadata)
+ requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
+ rescue GRPC::Core::CallError => e
+ receive_and_check_status # check for Cancelled
+ raise e
+ rescue => e
+ set_input_stream_done
+ raise e
+ ensure
+ set_output_stream_done
+ end
- requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
batch_result = @call.run_batch(
SEND_CLOSE_FROM_CLIENT => nil,
RECV_INITIAL_METADATA => nil,
@@ -350,12 +398,11 @@ module GRPC
RECV_STATUS_ON_CLIENT => nil
)
+ set_input_stream_done
+
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
get_message_from_batch_result(batch_result)
- rescue GRPC::Core::CallError => e
- finished # checks for Cancelled
- raise e
end
# server_streamer sends one request to the GRPC server, which yields a
@@ -373,6 +420,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {})
+ raise_error_if_already_executed
ops = {
SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil
@@ -384,13 +432,22 @@ module GRPC
end
@metadata_sent = true
end
- @call.run_batch(ops)
+
+ begin
+ @call.run_batch(ops)
+ rescue GRPC::Core::CallError => e
+ receive_and_check_status # checks for Cancelled
+ raise e
+ rescue => e
+ set_input_stream_done
+ raise e
+ ensure
+ set_output_stream_done
+ end
+
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
- rescue GRPC::Core::CallError => e
- finished # checks for Cancelled
- raise e
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -421,6 +478,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk)
+ raise_error_if_already_executed
# Metadata might have already been sent if this is an operation view
merge_metadata_and_send_if_not_already_sent(metadata)
bd = BidiCall.new(@call,
@@ -428,7 +486,10 @@ module GRPC
@unmarshal,
metadata_received: @metadata_received)
- bd.run_on_client(requests, @op_notifier, &blk)
+ bd.run_on_client(requests,
+ proc { set_input_stream_done },
+ proc { set_output_stream_done },
+ &blk)
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -449,7 +510,7 @@ module GRPC
metadata_received: @metadata_received,
req_view: MultiReqView.new(self))
- bd.run_on_server(gen_each_reply)
+ bd.run_on_server(gen_each_reply, proc { set_input_stream_done })
end
# Waits till an operation completes
@@ -459,7 +520,8 @@ module GRPC
@op_notifier.wait
end
- # Signals that an operation is done
+ # Signals that an operation is done.
+ # Only relevant on the client-side (this is a no-op on the server-side)
def op_is_done
return if @op_notifier.nil?
@op_notifier.notify(self)
@@ -484,8 +546,40 @@ module GRPC
end
end
+ def attach_peer_cert(peer_cert)
+ @peer_cert = peer_cert
+ end
+
private
+ # To be called once the "input stream" has been completelly
+ # read through (i.e, done reading from client or received status)
+ # note this is idempotent
+ def set_input_stream_done
+ @call_finished_mu.synchronize do
+ @input_stream_done = true
+ maybe_finish_and_close_call_locked
+ end
+ end
+
+ # To be called once the "output stream" has been completelly
+ # sent through (i.e, done sending from client or sent status)
+ # note this is idempotent
+ def set_output_stream_done
+ @call_finished_mu.synchronize do
+ @output_stream_done = true
+ maybe_finish_and_close_call_locked
+ end
+ end
+
+ def maybe_finish_and_close_call_locked
+ return unless @output_stream_done && @input_stream_done
+ return if @call_finished
+ @call_finished = true
+ op_is_done
+ @call.close
+ end
+
# Starts the call if not already started
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
@@ -493,6 +587,15 @@ module GRPC
merge_metadata_to_send(metadata) && send_initial_metadata
end
+ def raise_error_if_already_executed
+ @client_call_executed_mu.synchronize do
+ if @client_call_executed
+ fail GRPC::Core::CallError, 'attempting to re-run a call'
+ end
+ @client_call_executed = true
+ end
+ end
+
def self.view_class(*visible_methods)
Class.new do
extend ::Forwardable
@@ -516,8 +619,9 @@ module GRPC
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
- MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg,
+ MultiReqView = view_class(:cancelled?, :deadline,
:each_remote_read, :metadata, :output_metadata,
+ :peer, :peer_cert,
:send_initial_metadata,
:metadata_to_send,
:merge_metadata_to_send,
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index e54cf78969..9e125cd986 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -62,12 +62,19 @@ module GRPC
# block that can be invoked with each response.
#
# @param requests the Enumerable of requests to send
- # @param op_notifier a Notifier used to signal completion
+ # @param set_input_stream_done [Proc] called back when we're done
+ # reading the input stream
+ # @param set_input_stream_done [Proc] called back when we're done
+ # sending data on the output stream
# @return an Enumerator of requests to yield
- def run_on_client(requests, op_notifier, &blk)
- @op_notifier = op_notifier
- @enq_th = Thread.new { write_loop(requests) }
- read_loop(&blk)
+ def run_on_client(requests,
+ set_input_stream_done,
+ set_output_stream_done,
+ &blk)
+ @enq_th = Thread.new do
+ write_loop(requests, set_output_stream_done: set_output_stream_done)
+ end
+ read_loop(set_input_stream_done, &blk)
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -81,12 +88,17 @@ module GRPC
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
- def run_on_server(gen_each_reply)
+ # @param set_input_steam_done [Proc] call back to call when
+ # the reads have been completely read through.
+ def run_on_server(gen_each_reply, set_input_stream_done)
# Pass in the optional call object parameter if possible
if gen_each_reply.arity == 1
- replys = gen_each_reply.call(read_loop(is_client: false))
+ replys = gen_each_reply.call(
+ read_loop(set_input_stream_done, is_client: false))
elsif gen_each_reply.arity == 2
- replys = gen_each_reply.call(read_loop(is_client: false), @req_view)
+ replys = gen_each_reply.call(
+ read_loop(set_input_stream_done, is_client: false),
+ @req_view)
else
fail 'Illegal arity of reply generator'
end
@@ -99,22 +111,6 @@ module GRPC
END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes
- # signals that bidi operation is complete
- def notify_done
- return unless @op_notifier
- GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}")
- @op_notifier.notify(self)
- end
-
- # signals that a bidi operation is complete (read + write)
- def finished
- @done_mutex.synchronize do
- return unless @reads_complete && @writes_complete && !@complete
- @call.close
- @complete = true
- end
- end
-
# performs a read using @call.run_batch, ensures metadata is set up
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
@@ -127,7 +123,8 @@ module GRPC
batch_result
end
- def write_loop(requests, is_client: true)
+ # set_output_stream_done is relevant on client-side
+ def write_loop(requests, is_client: true, set_output_stream_done: nil)
GRPC.logger.debug('bidi-write-loop: starting')
count = 0
requests.each do |req|
@@ -151,23 +148,20 @@ module GRPC
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
@call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
GRPC.logger.debug('bidi-write-loop: done')
- notify_done
- @writes_complete = true
- finished
end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
- notify_done
- @writes_complete = true
- finished
raise e
+ ensure
+ set_output_stream_done.call if is_client
end
# Provides an enumerator that yields results of remote reads
- def read_loop(is_client: true)
+ def read_loop(set_input_stream_done, is_client: true)
return enum_for(:read_loop,
+ set_input_stream_done,
is_client: is_client) unless block_given?
GRPC.logger.debug('bidi-read-loop: starting')
begin
@@ -201,10 +195,10 @@ module GRPC
GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e)
raise e
+ ensure
+ set_input_stream_done.call
end
GRPC.logger.debug('bidi-read-loop: finished')
- @reads_complete = true
- finished
# Make sure that the write loop is done done before finishing the call.
# Note that blocking is ok at this point because we've already received
# a status
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index ce0097573a..89cf8ff6a0 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -48,7 +48,7 @@ module GRPC
end
def handle_request_response(active_call, mth)
- req = active_call.remote_read
+ req = active_call.read_unary_request
resp = mth.call(req, active_call.single_req_view)
active_call.server_unary_response(
resp, trailing_metadata: active_call.output_metadata)
@@ -61,7 +61,7 @@ module GRPC
end
def handle_server_streamer(active_call, mth)
- req = active_call.remote_read
+ req = active_call.read_unary_request
replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) }
send_status(active_call, OK, 'OK', active_call.output_metadata)
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ef2cc0ce91..33b3cea1fc 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -418,6 +418,7 @@ module GRPC
metadata_received: true,
started: false,
metadata_to_send: connect_md)
+ c.attach_peer_cert(an_rpc.call.peer_cert)
mth = an_rpc.method.to_sym
[c, mth]
end
diff --git a/src/ruby/spec/client_auth_spec.rb b/src/ruby/spec/client_auth_spec.rb
new file mode 100644
index 0000000000..79c9192aa5
--- /dev/null
+++ b/src/ruby/spec/client_auth_spec.rb
@@ -0,0 +1,137 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'grpc'
+
+def create_channel_creds
+ test_root = File.join(File.dirname(__FILE__), 'testdata')
+ files = ['ca.pem', 'client.key', 'client.pem']
+ creds = files.map { |f| File.open(File.join(test_root, f)).read }
+ GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2])
+end
+
+def client_cert
+ test_root = File.join(File.dirname(__FILE__), 'testdata')
+ cert = File.open(File.join(test_root, 'client.pem')).read
+ fail unless cert.is_a?(String)
+ cert
+end
+
+def create_server_creds
+ test_root = File.join(File.dirname(__FILE__), 'testdata')
+ p "test root: #{test_root}"
+ files = ['ca.pem', 'server1.key', 'server1.pem']
+ creds = files.map { |f| File.open(File.join(test_root, f)).read }
+ GRPC::Core::ServerCredentials.new(
+ creds[0],
+ [{ private_key: creds[1], cert_chain: creds[2] }],
+ true) # force client auth
+end
+
+# A test message
+class EchoMsg
+ def self.marshal(_o)
+ ''
+ end
+
+ def self.unmarshal(_o)
+ EchoMsg.new
+ end
+end
+
+# a test service that checks the cert of its peer
+class SslTestService
+ include GRPC::GenericService
+ rpc :an_rpc, EchoMsg, EchoMsg
+ rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
+ rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
+ rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
+
+ def check_peer_cert(call)
+ error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}"
+ fail(error_msg) unless call.peer_cert == client_cert
+ end
+
+ def an_rpc(req, call)
+ check_peer_cert(call)
+ req
+ end
+
+ def a_client_streaming_rpc(call)
+ check_peer_cert(call)
+ call.each_remote_read.each { |r| p r }
+ EchoMsg.new
+ end
+
+ def a_server_streaming_rpc(_, call)
+ check_peer_cert(call)
+ [EchoMsg.new, EchoMsg.new]
+ end
+
+ def a_bidi_rpc(requests, call)
+ check_peer_cert(call)
+ requests.each { |r| p r }
+ [EchoMsg.new, EchoMsg.new]
+ end
+end
+
+SslTestServiceStub = SslTestService.rpc_stub_class
+
+describe 'client-server auth' do
+ RpcServer = GRPC::RpcServer
+
+ before(:all) do
+ server_opts = {
+ poll_period: 1
+ }
+ @srv = RpcServer.new(**server_opts)
+ port = @srv.add_http2_port('0.0.0.0:0', create_server_creds)
+ @srv.handle(SslTestService)
+ @srv_thd = Thread.new { @srv.run }
+ @srv.wait_till_running
+
+ client_opts = {
+ channel_args: {
+ GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
+ }
+ }
+ @stub = SslTestServiceStub.new("localhost:#{port}",
+ create_channel_creds,
+ **client_opts)
+ end
+
+ after(:all) do
+ expect(@srv.stopped?).to be(false)
+ @srv.stop
+ @srv_thd.join
+ end
+
+ it 'client-server auth with unary RPCs' do
+ @stub.an_rpc(EchoMsg.new)
+ end
+
+ it 'client-server auth with client streaming RPCs' do
+ @stub.a_client_streaming_rpc([EchoMsg.new, EchoMsg.new])
+ end
+
+ it 'client-server auth with server streaming RPCs' do
+ responses = @stub.a_server_streaming_rpc(EchoMsg.new)
+ responses.each { |r| p r }
+ end
+
+ it 'client-server auth with bidi RPCs' do
+ responses = @stub.a_bidi_rpc([EchoMsg.new, EchoMsg.new])
+ responses.each { |r| p r }
+ end
+end
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index af3752c4ac..b48b4179ce 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -448,11 +448,18 @@ describe 'the secure http client/server' do
it_behaves_like 'GRPC metadata delivery works OK' do
end
- it 'modifies metadata with CallCredentials' do
- auth_proc = proc { { 'k1' => 'updated-v1' } }
+ def credentials_update_test(creds_update_md)
+ auth_proc = proc { creds_update_md }
call_creds = GRPC::Core::CallCredentials.new(auth_proc)
- md = { 'k2' => 'v2' }
- expected_md = { 'k1' => 'updated-v1', 'k2' => 'v2' }
+
+ initial_md_key = 'k2'
+ initial_md_val = 'v2'
+ initial_md = {}
+ initial_md[initial_md_key] = initial_md_val
+ expected_md = creds_update_md.clone
+ fail 'bad test param' unless expected_md[initial_md_key].nil?
+ expected_md[initial_md_key] = initial_md_val
+
recvd_rpc = nil
rcv_thread = Thread.new do
recvd_rpc = @server.request_call
@@ -461,7 +468,7 @@ describe 'the secure http client/server' do
call = new_client_call
call.set_credentials! call_creds
client_ops = {
- CallOps::SEND_INITIAL_METADATA => md
+ CallOps::SEND_INITIAL_METADATA => initial_md
}
batch_result = call.run_batch(client_ops)
expect(batch_result.send_metadata).to be true
@@ -473,4 +480,21 @@ describe 'the secure http client/server' do
replace_symbols = Hash[expected_md.each_pair.collect { |x, y| [x.to_s, y] }]
expect(recvd_md).to eq(recvd_md.merge(replace_symbols))
end
+
+ it 'modifies metadata with CallCredentials' do
+ credentials_update_test('k1' => 'updated-v1')
+ end
+
+ it 'modifies large metadata with CallCredentials' do
+ val_array = %w(
+ '00000000000000000000000000000000000000000000000000000000000000',
+ '11111111111111111111111111111111111111111111111111111111111111',
+ )
+ md = {
+ k3: val_array,
+ k4: '0000000000000000000000000000000000000000000000000000000000',
+ keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v1'
+ }
+ credentials_update_test(md)
+ end
end
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 72e55ebcce..ec0c294174 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -473,7 +473,7 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('server_response')
server_call.send_status(OK, 'status code is OK')
- expect { client_call.finished }.to_not raise_error
+ expect { client_call.receive_and_check_status }.to_not raise_error
end
it 'finishes ok if the server sends an early status response' do
@@ -490,7 +490,7 @@ describe GRPC::ActiveCall do
expect do
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
end.to_not raise_error
- expect { client_call.finished }.to_not raise_error
+ expect { client_call.receive_and_check_status }.to_not raise_error
end
it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 09b88c7cef..42cff4041b 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -36,6 +36,53 @@ include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts
include GRPC::Core::CallOps
+# check that methods on a finished/closed call t crash
+def check_op_view_of_finished_client_call(op_view,
+ expected_metadata,
+ expected_trailing_metadata)
+ # use read_response_stream to try to iterate through
+ # possible response stream
+ fail('need something to attempt reads') unless block_given?
+ expect do
+ resp = op_view.execute
+ yield resp
+ end.to raise_error(GRPC::Core::CallError)
+
+ expect { op_view.start_call }.to raise_error(RuntimeError)
+
+ sanity_check_values_of_accessors(op_view,
+ expected_metadata,
+ expected_trailing_metadata)
+
+ expect do
+ op_view.wait
+ op_view.cancel
+ op_view.write_flag = 1
+ end.to_not raise_error
+end
+
+def sanity_check_values_of_accessors(op_view,
+ expected_metadata,
+ expected_trailing_metadata)
+ expected_status = Struct::Status.new
+ expected_status.code = 0
+ expected_status.details = 'OK'
+ expected_status.metadata = expected_trailing_metadata
+
+ expect(op_view.status).to eq(expected_status)
+ expect(op_view.metadata).to eq(expected_metadata)
+ expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
+
+ expect(op_view.cancelled?).to be(false)
+ expect(op_view.write_flag).to be(nil)
+
+ # The deadline attribute of a call can be either
+ # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
+ # TODO: fix so that the accessor always returns the same type.
+ expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
+ op_view.deadline.is_a?(Time)).to be(true)
+end
+
describe 'ClientStub' do
let(:noop) { proc { |x| x } }
@@ -45,6 +92,7 @@ describe 'ClientStub' do
@method = 'an_rpc_method'
@pass = OK
@fail = INTERNAL
+ @metadata = { k1: 'v1', k2: 'v2' }
end
after(:each) do
@@ -107,7 +155,7 @@ describe 'ClientStub' do
end
end
- describe '#request_response' do
+ describe '#request_response', request_response: true do
before(:each) do
@sent_msg, @resp = 'a_msg', 'a_reply'
end
@@ -122,16 +170,42 @@ describe 'ClientStub' do
th.join
end
- it 'should send metadata to the server ok' do
+ def metadata_test(md)
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass,
- k1: 'v1', k2: 'v2')
+ expected_metadata: md)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ @metadata = md
expect(get_response(stub)).to eq(@resp)
th.join
end
+ it 'should send metadata to the server ok' do
+ metadata_test(k1: 'v1', k2: 'v2')
+ end
+
+ # these tests mostly try to exercise when md might be allocated
+ # instead of inlined
+ it 'should send metadata with multiple large md to the server ok' do
+ val_array = %w(
+ '00000000000000000000000000000000000000000000000000000000000000',
+ '11111111111111111111111111111111111111111111111111111111111111',
+ '22222222222222222222222222222222222222222222222222222222222222',
+ )
+ md = {
+ k1: val_array,
+ k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
+ k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
+ k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc',
+ keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5',
+ 'k66666666666666666666666666666666666666666666666666666' => 'v6',
+ 'k77777777777777777777777777777777777777777777777777777' => 'v7',
+ 'k88888888888888888888888888888888888888888888888888888' => 'v8'
+ }
+ metadata_test(md)
+ end
+
it 'should send a request when configured using an override channel' do
server_port = create_test_server
alt_host = "localhost:#{server_port}"
@@ -187,13 +261,24 @@ describe 'ClientStub' do
# Kill the server thread so tests can complete
th.kill
end
+
+ it 'should raise ArgumentError if metadata contains invalid values' do
+ @metadata.merge!(k3: 3)
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ expect do
+ get_response(stub)
+ end.to raise_error(ArgumentError,
+ /Header values must be of type string or array/)
+ end
end
describe 'without a call operation' do
def get_response(stub, credentials: nil)
puts credentials.inspect
stub.request_response(@method, @sent_msg, noop, noop,
- metadata: { k1: 'v1', k2: 'v2' },
+ metadata: @metadata,
credentials: credentials)
end
@@ -201,40 +286,62 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
+ after(:each) do
+ # make sure op.wait doesn't hang, even if there's a bad status
+ @op.wait
+ end
def get_response(stub, run_start_call_first: false, credentials: nil)
- op = stub.request_response(@method, @sent_msg, noop, noop,
- return_op: true,
- metadata: { k1: 'v1', k2: 'v2' },
- deadline: from_relative_time(2),
- credentials: credentials)
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.start_call if run_start_call_first
- result = op.execute
- op.wait # make sure wait doesn't hang
+ @op = stub.request_response(@method, @sent_msg, noop, noop,
+ return_op: true,
+ metadata: @metadata,
+ deadline: from_relative_time(2),
+ credentials: credentials)
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ result = @op.execute
result
end
it_behaves_like 'request response'
- it 'sends metadata to the server ok when running start_call first' do
+ def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server
host = "localhost:#{server_port}"
- th = run_request_response(@sent_msg, @resp, @pass,
- k1: 'v1', k2: 'v2')
+
+ @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+ @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+ th = run_request_response(
+ @sent_msg, @resp, @pass,
+ expected_metadata: @metadata,
+ server_initial_md: @server_initial_md,
+ server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- expect(get_response(stub)).to eq(@resp)
+ expect(
+ get_response(stub,
+ run_start_call_first: run_start_call_first)).to eq(@resp)
th.join
end
+
+ it 'sends metadata to the server ok when running start_call first' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) { |r| p r }
+ end
+
+ it 'does not crash when used after the call has been finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) { |r| p r }
+ end
end
end
- describe '#client_streamer' do
+ describe '#client_streamer', client_streamer: true do
before(:each) do
Thread.abort_on_exception = true
server_port = create_test_server
host = "localhost:#{server_port}"
@stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- @metadata = { k1: 'v1', k2: 'v2' }
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@resp = 'a_reply'
end
@@ -247,7 +354,8 @@ describe 'ClientStub' do
end
it 'should send metadata to the server ok' do
- th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
+ th = run_client_streamer(@sent_msgs, @resp, @pass,
+ expected_metadata: @metadata)
expect(get_response(@stub)).to eq(@resp)
th.join
end
@@ -278,27 +386,50 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
+ after(:each) do
+ # make sure op.wait doesn't hang, even if there's a bad status
+ @op.wait
+ end
def get_response(stub, run_start_call_first: false)
- op = stub.client_streamer(@method, @sent_msgs, noop, noop,
- return_op: true, metadata: @metadata)
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.start_call if run_start_call_first
- result = op.execute
- op.wait # make sure wait doesn't hang
+ @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
+ return_op: true, metadata: @metadata)
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ result = @op.execute
result
end
it_behaves_like 'client streaming'
- it 'sends metadata to the server ok when running start_call first' do
- th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
- expect(get_response(@stub, run_start_call_first: true)).to eq(@resp)
+ def run_op_view_metadata_test(run_start_call_first)
+ @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+ @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+ th = run_client_streamer(
+ @sent_msgs, @resp, @pass,
+ expected_metadata: @metadata,
+ server_initial_md: @server_initial_md,
+ server_trailing_md: @server_trailing_md)
+ expect(
+ get_response(@stub,
+ run_start_call_first: run_start_call_first)).to eq(@resp)
th.join
end
+
+ it 'sends metadata to the server ok when running start_call first' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) { |r| p r }
+ end
+
+ it 'does not crash when used after the call has been finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) { |r| p r }
+ end
end
end
- describe '#server_streamer' do
+ describe '#server_streamer', server_streamer: true do
before(:each) do
@sent_msg = 'a_msg'
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -328,18 +459,63 @@ describe 'ClientStub' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail,
- k1: 'v1', k2: 'v2')
+ expected_metadata: { k1: 'v1', k2: 'v2' })
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
end
+
+ it 'should raise ArgumentError if metadata contains invalid values' do
+ @metadata.merge!(k3: 3)
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ expect do
+ get_responses(stub)
+ end.to raise_error(ArgumentError,
+ /Header values must be of type string or array/)
+ end
+
+ def run_server_streamer_against_client_with_unmarshal_error(
+ expected_input, replys)
+ wakey_thread do |notifier|
+ c = expect_server_to_be_invoked(notifier)
+ expect(c.remote_read).to eq(expected_input)
+ begin
+ replys.each { |r| c.remote_send(r) }
+ rescue GRPC::Core::CallError
+ # An attempt to write to the client might fail. This is ok
+ # because the client call is expected to fail when
+ # unmarshalling the first response, and to cancel the call,
+ # and there is a race as for when the server-side call will
+ # start to fail.
+ p 'remote_send failed (allowed because call expected to cancel)'
+ ensure
+ c.send_status(OK, 'OK', true)
+ end
+ end
+ end
+
+ it 'the call terminates when there is an unmarshalling error' do
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ th = run_server_streamer_against_client_with_unmarshal_error(
+ @sent_msg, @replys)
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+
+ unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
+ expect do
+ get_responses(stub, unmarshal: unmarshal).collect { |r| r }
+ end.to raise_error(ArgumentError, 'test unmarshalling error')
+ th.join
+ end
end
describe 'without a call operation' do
- def get_responses(stub)
- e = stub.server_streamer(@method, @sent_msg, noop, noop,
- metadata: { k1: 'v1', k2: 'v2' })
+ def get_responses(stub, unmarshal: noop)
+ e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
+ metadata: @metadata)
expect(e).to be_a(Enumerator)
e
end
@@ -351,10 +527,10 @@ describe 'ClientStub' do
after(:each) do
@op.wait # make sure wait doesn't hang
end
- def get_responses(stub, run_start_call_first: false)
- @op = stub.server_streamer(@method, @sent_msg, noop, noop,
+ def get_responses(stub, run_start_call_first: false, unmarshal: noop)
+ @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
return_op: true,
- metadata: { k1: 'v1', k2: 'v2' })
+ metadata: @metadata)
expect(@op).to be_a(GRPC::ActiveCall::Operation)
@op.start_call if run_start_call_first
e = @op.execute
@@ -364,20 +540,41 @@ describe 'ClientStub' do
it_behaves_like 'server streaming'
- it 'should send metadata to the server ok when start_call is run first' do
+ def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server
host = "localhost:#{server_port}"
- th = run_server_streamer(@sent_msg, @replys, @fail,
- k1: 'v1', k2: 'v2')
+ @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+ @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+ th = run_server_streamer(
+ @sent_msg, @replys, @pass,
+ expected_metadata: @metadata,
+ server_initial_md: @server_initial_md,
+ server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- e = get_responses(stub, run_start_call_first: true)
- expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+ e = get_responses(stub, run_start_call_first: run_start_call_first)
+ expect(e.collect { |r| r }).to eq(@replys)
th.join
end
+
+ it 'should send metadata to the server ok when start_call is run first' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) do |responses|
+ responses.each { |r| p r }
+ end
+ end
+
+ it 'does not crash when used after the call has been finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) do |responses|
+ responses.each { |r| p r }
+ end
+ end
end
end
- describe '#bidi_streamer' do
+ describe '#bidi_streamer', bidi: true do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -386,7 +583,7 @@ describe 'ClientStub' do
end
shared_examples 'bidi streaming' do
- it 'supports sending all the requests first', bidi: true do
+ it 'supports sending all the requests first' do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
@@ -395,7 +592,7 @@ describe 'ClientStub' do
th.join
end
- it 'supports client-initiated ping pong', bidi: true do
+ it 'supports client-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
@@ -403,18 +600,39 @@ describe 'ClientStub' do
th.join
end
- it 'supports a server-initiated ping pong', bidi: true do
+ it 'supports a server-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
+
+ it 'should raise an error if the status is not ok' do
+ th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ e = get_responses(stub)
+ expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+ th.join
+ end
+
+ # TODO: add test for metadata-related ArgumentError in a bidi call once
+ # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed
+
+ it 'should send metadata to the server ok' do
+ th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
+ expected_metadata: @metadata)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ e = get_responses(stub)
+ expect(e.collect { |r| r }).to eq(@sent_msgs)
+ th.join
+ end
end
describe 'without a call operation' do
def get_responses(stub)
- e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
+ e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+ metadata: @metadata)
expect(e).to be_a(Enumerator)
e
end
@@ -428,7 +646,8 @@ describe 'ClientStub' do
end
def get_responses(stub, run_start_call_first: false)
@op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
- return_op: true)
+ return_op: true,
+ metadata: @metadata)
expect(@op).to be_a(GRPC::ActiveCall::Operation)
@op.start_call if run_start_call_first
e = @op.execute
@@ -438,27 +657,53 @@ describe 'ClientStub' do
it_behaves_like 'bidi streaming'
- it 'can run start_call before executing the call' do
- th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
- @pass)
+ def run_op_view_metadata_test(run_start_call_first)
+ @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+ @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+ th = run_bidi_streamer_echo_ping_pong(
+ @sent_msgs, @pass, true,
+ expected_metadata: @metadata,
+ server_initial_md: @server_initial_md,
+ server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
- e = get_responses(stub, run_start_call_first: true)
- expect(e.collect { |r| r }).to eq(@replys)
+ e = get_responses(stub, run_start_call_first: run_start_call_first)
+ expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
+
+ it 'can run start_call before executing the call' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) do |responses|
+ responses.each { |r| p r }
+ end
+ end
+
+ it 'doesnt crash when op_view used after call has finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) do |responses|
+ responses.each { |r| p r }
+ end
+ end
end
end
- def run_server_streamer(expected_input, replys, status, **kw)
- wanted_metadata = kw.clone
+ def run_server_streamer(expected_input, replys, status,
+ expected_metadata: {},
+ server_initial_md: {},
+ server_trailing_md: {})
+ wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
- c = expect_server_to_be_invoked(notifier)
+ c = expect_server_to_be_invoked(
+ notifier, metadata_to_send: server_initial_md)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
expect(c.remote_read).to eq(expected_input)
replys.each { |r| c.remote_send(r) }
- c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+ metadata: server_trailing_md)
end
end
@@ -472,9 +717,17 @@ describe 'ClientStub' do
end
end
- def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
+ def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
+ expected_metadata: {},
+ server_initial_md: {},
+ server_trailing_md: {})
+ wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
- c = expect_server_to_be_invoked(notifier)
+ c = expect_server_to_be_invoked(
+ notifier, metadata_to_send: server_initial_md)
+ wanted_metadata.each do |k, v|
+ expect(c.metadata[k.to_s]).to eq(v)
+ end
expected_inputs.each do |i|
if client_starts
expect(c.remote_read).to eq(i)
@@ -484,33 +737,44 @@ describe 'ClientStub' do
expect(c.remote_read).to eq(i)
end
end
- c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+ metadata: server_trailing_md)
end
end
- def run_client_streamer(expected_inputs, resp, status, **kw)
- wanted_metadata = kw.clone
+ def run_client_streamer(expected_inputs, resp, status,
+ expected_metadata: {},
+ server_initial_md: {},
+ server_trailing_md: {})
+ wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
- c = expect_server_to_be_invoked(notifier)
+ c = expect_server_to_be_invoked(
+ notifier, metadata_to_send: server_initial_md)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
- c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+ metadata: server_trailing_md)
end
end
- def run_request_response(expected_input, resp, status, **kw)
- wanted_metadata = kw.clone
+ def run_request_response(expected_input, resp, status,
+ expected_metadata: {},
+ server_initial_md: {},
+ server_trailing_md: {})
+ wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
- c = expect_server_to_be_invoked(notifier)
+ c = expect_server_to_be_invoked(
+ notifier, metadata_to_send: server_initial_md)
expect(c.remote_read).to eq(expected_input)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
- c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+ metadata: server_trailing_md)
end
end
@@ -528,13 +792,13 @@ describe 'ClientStub' do
@server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
end
- def expect_server_to_be_invoked(notifier)
+ def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
@server.start
notifier.notify(nil)
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
recvd_call.metadata = recvd_rpc.metadata
- recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
+ recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
metadata_received: true)
end
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 100e9e8487..be578c40d3 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -38,14 +38,14 @@ describe GRPC::RpcDesc do
shared_examples 'it handles errors' do
it 'sends the specified status if BadStatus is raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
metadata: {})
this_desc.run_server_method(@call, method(:bad_status))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(UNKNOWN,
arg_error_msg,
false, metadata: {})
@@ -53,7 +53,7 @@ describe GRPC::RpcDesc do
end
it 'absorbs CallError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(CallError)
+ expect(@call).to receive(:read_unary_request).once.and_raise(CallError)
blk = proc do
this_desc.run_server_method(@call, method(:fake_reqresp))
end
@@ -75,7 +75,7 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
req = Object.new
- expect(@call).to receive(:remote_read).once.and_return(req)
+ expect(@call).to receive(:read_unary_request).once.and_return(req)
expect(@call).to receive(:output_metadata).once.and_return(fake_md)
expect(@call).to receive(:server_unary_response).once
.with(@ok_response, trailing_metadata: fake_md)
@@ -133,7 +133,7 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
req = Object.new
- expect(@call).to receive(:remote_read).once.and_return(req)
+ expect(@call).to receive(:read_unary_request).once.and_return(req)
expect(@call).to receive(:remote_send).twice.with(@ok_response)
expect(@call).to receive(:output_metadata).and_return(fake_md)
expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 9633a828a2..e0646f4599 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -111,6 +111,47 @@ end
SlowStub = SlowService.rpc_stub_class
+# a test service that hangs onto call objects
+# and uses them after the server-side call has been
+# finished
+class CheckCallAfterFinishedService
+ include GRPC::GenericService
+ rpc :an_rpc, EchoMsg, EchoMsg
+ rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
+ rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
+ rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
+ attr_reader :server_side_call
+
+ def an_rpc(req, call)
+ fail 'shouldnt reuse service' unless @server_side_call.nil?
+ @server_side_call = call
+ req
+ end
+
+ def a_client_streaming_rpc(call)
+ fail 'shouldnt reuse service' unless @server_side_call.nil?
+ @server_side_call = call
+ # iterate through requests so call can complete
+ call.each_remote_read.each { |r| p r }
+ EchoMsg.new
+ end
+
+ def a_server_streaming_rpc(_, call)
+ fail 'shouldnt reuse service' unless @server_side_call.nil?
+ @server_side_call = call
+ [EchoMsg.new, EchoMsg.new]
+ end
+
+ def a_bidi_rpc(requests, call)
+ fail 'shouldnt reuse service' unless @server_side_call.nil?
+ @server_side_call = call
+ requests.each { |r| p r }
+ [EchoMsg.new, EchoMsg.new]
+ end
+end
+
+CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
+
describe GRPC::RpcServer do
RpcServer = GRPC::RpcServer
StatusCodes = GRPC::Core::StatusCodes
@@ -505,5 +546,109 @@ describe GRPC::RpcServer do
t.join
end
end
+
+ context 'when call objects are used after calls have completed' do
+ before(:each) do
+ server_opts = {
+ poll_period: 1
+ }
+ @srv = RpcServer.new(**server_opts)
+ alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
+ @alt_host = "0.0.0.0:#{alt_port}"
+
+ @service = CheckCallAfterFinishedService.new
+ @srv.handle(@service)
+ @srv_thd = Thread.new { @srv.run }
+ @srv.wait_till_running
+ end
+
+ # check that the server-side call is still in a usable state even
+ # after it has finished
+ def check_single_req_view_of_finished_call(call)
+ common_check_of_finished_server_call(call)
+
+ expect(call.peer).to be_a(String)
+ expect(call.peer_cert).to be(nil)
+ end
+
+ def check_multi_req_view_of_finished_call(call)
+ common_check_of_finished_server_call(call)
+
+ expect do
+ call.each_remote_read.each { |r| p r }
+ end.to raise_error(GRPC::Core::CallError)
+ end
+
+ def common_check_of_finished_server_call(call)
+ expect do
+ call.merge_metadata_to_send({})
+ end.to raise_error(RuntimeError)
+
+ expect do
+ call.send_initial_metadata
+ end.to_not raise_error
+
+ expect(call.cancelled?).to be(false)
+ expect(call.metadata).to be_a(Hash)
+ expect(call.metadata['user-agent']).to be_a(String)
+
+ expect(call.metadata_sent).to be(true)
+ expect(call.output_metadata).to eq({})
+ expect(call.metadata_to_send).to eq({})
+ expect(call.deadline.is_a?(Time)).to be(true)
+ end
+
+ it 'should not crash when call used after an unary call is finished' do
+ req = EchoMsg.new
+ stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+ :this_channel_is_insecure)
+ resp = stub.an_rpc(req)
+ expect(resp).to be_a(EchoMsg)
+ @srv.stop
+ @srv_thd.join
+
+ check_single_req_view_of_finished_call(@service.server_side_call)
+ end
+
+ it 'should not crash when call used after client streaming finished' do
+ requests = [EchoMsg.new, EchoMsg.new]
+ stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+ :this_channel_is_insecure)
+ resp = stub.a_client_streaming_rpc(requests)
+ expect(resp).to be_a(EchoMsg)
+ @srv.stop
+ @srv_thd.join
+
+ check_multi_req_view_of_finished_call(@service.server_side_call)
+ end
+
+ it 'should not crash when call used after server streaming finished' do
+ req = EchoMsg.new
+ stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+ :this_channel_is_insecure)
+ responses = stub.a_server_streaming_rpc(req)
+ responses.each do |r|
+ expect(r).to be_a(EchoMsg)
+ end
+ @srv.stop
+ @srv_thd.join
+
+ check_single_req_view_of_finished_call(@service.server_side_call)
+ end
+
+ it 'should not crash when call used after a bidi call is finished' do
+ requests = [EchoMsg.new, EchoMsg.new]
+ stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+ :this_channel_is_insecure)
+ responses = stub.a_bidi_rpc(requests)
+ responses.each do |r|
+ expect(r).to be_a(EchoMsg)
+ end
+ @srv.stop
+ @srv_thd.join
+
+ check_multi_req_view_of_finished_call(@service.server_side_call)
+ end
+ end
end
end
diff --git a/src/ruby/spec/testdata/client.key b/src/ruby/spec/testdata/client.key
new file mode 100644
index 0000000000..f48d0735d9
--- /dev/null
+++ b/src/ruby/spec/testdata/client.key
@@ -0,0 +1,16 @@
+-----BEGIN PRIVATE KEY-----
+MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAOxUR9uhvhbeVUIM
+s5WbH0px0mehl2+6sZpNjzvE2KimZpHzMJHukVH0Ffkvhs0b8+S5Ut9VNUAqd3IM
+JCCAEGtRNoQhM1t9Yr2zAckSvbRacp+FL/Cj9eDmyo00KsVGaeefA4Dh4OW+ZhkT
+NKcldXqkSuj1sEf244JZYuqZp6/tAgMBAAECgYEAi2NSVqpZMafE5YYUTcMGe6QS
+k2jtpsqYgggI2RnLJ/2tNZwYI5pwP8QVSbnMaiF4gokD5hGdrNDfTnb2v+yIwYEH
+0w8+oG7Z81KodsiZSIDJfTGsAZhVNwOz9y0VD8BBZZ1/274Zh52AUKLjZS/ZwIbS
+W2ywya855dPnH/wj+0ECQQD9X8D920kByTNHhBG18biAEZ4pxs9f0OAG8333eVcI
+w2lJDLsYDZrCB2ocgA3lUdozlzPC7YDYw8reg0tkiRY5AkEA7sdNzOeQsQRn7++5
+0bP9DtT/iON1gbfxRzCfCfXdoOtfQWIzTePWtURt9X/5D9NofI0Rg5W2oGy/MLe5
+/sXHVQJBAIup5XrJDkQywNZyAUU2ecn2bCWBFjwtqd+LBmuMciI9fOKsZtEKZrz/
+U0lkeMRoSwvXE8wmGLjjrAbdfohrXFkCQQDZEx/LtIl6JINJQiswVe0tWr6k+ASP
+1WXoTm+HYpoF/XUvv9LccNF1IazFj34hwRQwhx7w/V52Ieb+p0jUMYGxAkEAjDhd
+9pBO1fKXWiXzi9ZKfoyTNcUq3eBSVKwPG2nItg5ycXengjT5sgcWDnciIzW7BIVI
+JiqOszq9GWESErAatg==
+-----END PRIVATE KEY-----
diff --git a/src/ruby/spec/testdata/client.pem b/src/ruby/spec/testdata/client.pem
new file mode 100644
index 0000000000..e332091019
--- /dev/null
+++ b/src/ruby/spec/testdata/client.pem
@@ -0,0 +1,14 @@
+-----BEGIN CERTIFICATE-----
+MIICHzCCAYgCAQEwDQYJKoZIhvcNAQEFBQAwVjELMAkGA1UEBhMCQVUxEzARBgNV
+BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0
+ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTE0MDcxNzIzNTYwMloXDTI0MDcxNDIzNTYw
+MlowWjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDETMBEGA1UEAwwKdGVzdGNsaWVudDCB
+nzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA7FRH26G+Ft5VQgyzlZsfSnHSZ6GX
+b7qxmk2PO8TYqKZmkfMwke6RUfQV+S+GzRvz5LlS31U1QCp3cgwkIIAQa1E2hCEz
+W31ivbMByRK9tFpyn4Uv8KP14ObKjTQqxUZp558DgOHg5b5mGRM0pyV1eqRK6PWw
+R/bjglli6pmnr+0CAwEAATANBgkqhkiG9w0BAQUFAAOBgQAStSm5PM7ubROiKK6/
+T2FkKlhiTOx+Ryenm3Eio59emq+jXl+1nhPySX5G2PQzSR5vd1dIhwgZSR4Gyttk
+tRZ57k/NI1brUW8joiEOMJA/Mr7H7asx7wIRYDE91Fs8GkKWd5LhoPAQj+qdG35C
+OO+svdkmqH0KZo320ZUqdl2ooQ==
+-----END CERTIFICATE-----