aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/ruby
diff options
context:
space:
mode:
Diffstat (limited to 'src/ruby')
-rwxr-xr-xsrc/ruby/bin/math_services.rb6
-rw-r--r--src/ruby/ext/grpc/rb_byte_buffer.c5
-rw-r--r--src/ruby/ext/grpc/rb_call.c149
-rw-r--r--src/ruby/ext/grpc/rb_call.h2
-rw-r--r--src/ruby/ext/grpc/rb_call_credentials.c31
-rw-r--r--src/ruby/ext/grpc/rb_channel.c54
-rw-r--r--src/ruby/ext/grpc/rb_channel_credentials.c32
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c149
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.h9
-rw-r--r--src/ruby/ext/grpc/rb_compression_options.c464
-rw-r--r--src/ruby/ext/grpc/rb_compression_options.h44
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c10
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h5
-rw-r--r--src/ruby/ext/grpc/rb_server.c214
-rw-r--r--src/ruby/ext/grpc/rb_server_credentials.c37
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb95
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb42
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb11
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb40
-rw-r--r--src/ruby/lib/grpc/generic/service.rb2
-rw-r--r--src/ruby/lib/grpc/version.rb2
-rw-r--r--src/ruby/pb/src/proto/grpc/testing/messages.rb18
-rwxr-xr-xsrc/ruby/pb/test/client.rb184
-rw-r--r--src/ruby/pb/test/proto/empty.rb15
-rw-r--r--src/ruby/pb/test/proto/messages.rb80
-rw-r--r--src/ruby/pb/test/proto/test.rb14
-rw-r--r--src/ruby/pb/test/proto/test_services.rb64
-rwxr-xr-xsrc/ruby/pb/test/server.rb18
-rw-r--r--src/ruby/qps/src/proto/grpc/testing/messages.rb18
-rw-r--r--src/ruby/spec/call_spec.rb3
-rw-r--r--src/ruby/spec/channel_spec.rb5
-rw-r--r--src/ruby/spec/client_server_spec.rb133
-rw-r--r--src/ruby/spec/completion_queue_spec.rb42
-rw-r--r--src/ruby/spec/compression_options_spec.rb164
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb152
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb75
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb38
-rw-r--r--src/ruby/spec/pb/health/checker_spec.rb2
-rw-r--r--src/ruby/spec/server_spec.rb44
-rw-r--r--src/ruby/tools/version.rb2
41 files changed, 1370 insertions, 1106 deletions
diff --git a/src/ruby/bin/math_services.rb b/src/ruby/bin/math_services.rb
index 34c36abdda..2b97602b6f 100755
--- a/src/ruby/bin/math_services.rb
+++ b/src/ruby/bin/math_services.rb
@@ -44,15 +44,15 @@ module Math
self.unmarshal_class_method = :decode
self.service_name = 'math.Math'
- # Div divides args.dividend by args.divisor and returns the quotient and
- # remainder.
+ # Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient
+ # and remainder.
rpc :Div, DivArgs, DivReply
# DivMany accepts an arbitrary number of division args from the client stream
# and sends back the results in the reply stream. The stream continues until
# the client closes its end; the server does the same after sending all the
# replies. The stream ends immediately if either end aborts.
rpc :DivMany, stream(DivArgs), stream(DivReply)
- # Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
+ # Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib
# generates up to limit numbers; otherwise it continues until the call is
# canceled. Unlike Fib above, Fib has no final FibReply.
rpc :Fib, FibArgs, stream(Num)
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c
index 1172691116..61b7c30315 100644
--- a/src/ruby/ext/grpc/rb_byte_buffer.c
+++ b/src/ruby/ext/grpc/rb_byte_buffer.c
@@ -56,7 +56,10 @@ VALUE grpc_rb_byte_buffer_to_s(grpc_byte_buffer *buffer) {
return Qnil;
}
rb_string = rb_str_buf_new(grpc_byte_buffer_length(buffer));
- grpc_byte_buffer_reader_init(&reader, buffer);
+ if (!grpc_byte_buffer_reader_init(&reader, buffer)) {
+ rb_raise(rb_eRuntimeError, "Error initializing byte buffer reader.");
+ return Qnil;
+ }
while (grpc_byte_buffer_reader_next(&reader, &next) != 0) {
rb_str_cat(rb_string, (const char *) GPR_SLICE_START_PTR(next),
GPR_SLICE_LENGTH(next));
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index b436057c16..67a42af619 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -38,6 +38,7 @@
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/impl/codegen/compression_types.h>
#include "rb_byte_buffer.h"
#include "rb_call_credentials.h"
@@ -63,27 +64,18 @@ static VALUE grpc_rb_sBatchResult;
* grpc_metadata_array. */
static VALUE grpc_rb_cMdAry;
-/* id_cq is the name of the hidden ivar that preserves a reference to a
- * completion queue */
-static ID id_cq;
-
-/* id_flags is the name of the hidden ivar that preserves the value of
- * the flags used to create metadata from a Hash */
-static ID id_flags;
-
/* id_credentials is the name of the hidden ivar that preserves the value
* of the credentials added to the call */
static ID id_credentials;
-/* id_input_md is the name of the hidden ivar that preserves the hash used to
- * create metadata, so that references to the strings it contains last as long
- * as the call the metadata is added to. */
-static ID id_input_md;
-
/* id_metadata is name of the attribute used to access the metadata hash
* received by the call and subsequently saved on it. */
static ID id_metadata;
+/* id_trailing_metadata is the name of the attribute used to access the trailing
+ * metadata hash received by the call and subsequently saved on it. */
+static ID id_trailing_metadata;
+
/* id_status is name of the attribute used to access the status object
* received by the call and subsequently saved on it. */
static ID id_status;
@@ -101,14 +93,27 @@ static VALUE sym_message;
static VALUE sym_status;
static VALUE sym_cancelled;
+typedef struct grpc_rb_call {
+ grpc_call *wrapped;
+ grpc_completion_queue *queue;
+} grpc_rb_call;
+
+static void destroy_call(grpc_rb_call *call) {
+ /* Ensure that we only try to destroy the call once */
+ if (call->wrapped != NULL) {
+ grpc_call_destroy(call->wrapped);
+ call->wrapped = NULL;
+ grpc_rb_completion_queue_destroy(call->queue);
+ call->queue = NULL;
+ }
+}
+
/* Destroys a Call. */
static void grpc_rb_call_destroy(void *p) {
- grpc_call* call = NULL;
if (p == NULL) {
return;
}
- call = (grpc_call *)p;
- grpc_call_destroy(call);
+ destroy_call((grpc_rb_call*)p);
}
static size_t md_ary_datasize(const void *p) {
@@ -167,15 +172,15 @@ const char *grpc_call_error_detail_of(grpc_call_error err) {
/* Called by clients to cancel an RPC on the server.
Can be called multiple times, from any thread. */
static VALUE grpc_rb_call_cancel(VALUE self) {
- grpc_call *call = NULL;
+ grpc_rb_call *call = NULL;
grpc_call_error err;
if (RTYPEDDATA_DATA(self) == NULL) {
//This call has been closed
return Qnil;
}
- TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
- err = grpc_call_cancel(call, NULL);
+ TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
+ err = grpc_call_cancel(call->wrapped, NULL);
if (err != GRPC_CALL_OK) {
rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)",
grpc_call_error_detail_of(err), err);
@@ -189,10 +194,10 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
processed.
*/
static VALUE grpc_rb_call_close(VALUE self) {
- grpc_call *call = NULL;
- TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+ grpc_rb_call *call = NULL;
+ TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
if(call != NULL) {
- grpc_call_destroy(call);
+ destroy_call(call);
RTYPEDDATA_DATA(self) = NULL;
}
return Qnil;
@@ -201,14 +206,14 @@ static VALUE grpc_rb_call_close(VALUE self) {
/* Called to obtain the peer that this call is connected to. */
static VALUE grpc_rb_call_get_peer(VALUE self) {
VALUE res = Qnil;
- grpc_call *call = NULL;
+ grpc_rb_call *call = NULL;
char *peer = NULL;
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call");
return Qnil;
}
- TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
- peer = grpc_call_get_peer(call);
+ TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
+ peer = grpc_call_get_peer(call->wrapped);
res = rb_str_new2(peer);
gpr_free(peer);
@@ -217,16 +222,16 @@ static VALUE grpc_rb_call_get_peer(VALUE self) {
/* Called to obtain the x509 cert of an authenticated peer. */
static VALUE grpc_rb_call_get_peer_cert(VALUE self) {
- grpc_call *call = NULL;
+ grpc_rb_call *call = NULL;
VALUE res = Qnil;
grpc_auth_context *ctx = NULL;
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call");
return Qnil;
}
- TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+ TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
- ctx = grpc_call_auth_context(call);
+ ctx = grpc_call_auth_context(call->wrapped);
if (!ctx || !grpc_auth_context_peer_is_authenticated(ctx)) {
return Qnil;
@@ -298,6 +303,30 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
/*
call-seq:
+ trailing_metadata = call.trailing_metadata
+
+ Gets the trailing metadata object saved on the call */
+static VALUE grpc_rb_call_get_trailing_metadata(VALUE self) {
+ return rb_ivar_get(self, id_trailing_metadata);
+}
+
+/*
+ call-seq:
+ call.trailing_metadata = trailing_metadata
+
+ Saves the trailing metadata hash on the call. */
+static VALUE grpc_rb_call_set_trailing_metadata(VALUE self, VALUE metadata) {
+ if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
+ rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
+ rb_obj_classname(metadata));
+ return Qnil;
+ }
+
+ return rb_ivar_set(self, id_trailing_metadata, metadata);
+}
+
+/*
+ call-seq:
write_flag = call.write_flag
Gets the write_flag value saved the call. */
@@ -326,21 +355,23 @@ static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
Sets credentials on a call */
static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) {
- grpc_call *call = NULL;
+ grpc_rb_call *call = NULL;
grpc_call_credentials *creds;
grpc_call_error err;
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call");
return Qnil;
}
- TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+ TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
creds = grpc_rb_get_wrapped_call_credentials(credentials);
- err = grpc_call_set_credentials(call, creds);
+ err = grpc_call_set_credentials(call->wrapped, creds);
if (err != GRPC_CALL_OK) {
rb_raise(grpc_rb_eCallError,
"grpc_call_set_credentials failed with %s (code=%d)",
grpc_call_error_detail_of(err), err);
}
+ /* We need the credentials to be alive for as long as the call is alive,
+ but we don't care about destruction order. */
rb_ivar_set(self, id_credentials, credentials);
return Qnil;
}
@@ -733,7 +764,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
}
/* call-seq:
- cq = CompletionQueue.new
ops = {
GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
@@ -741,7 +771,7 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
}
tag = Object.new
timeout = 10
- call.start_batch(cq, tag, timeout, ops)
+ call.start_batch(tag, timeout, ops)
Start a batch of operations defined in the array ops; when complete, post a
completion of type 'tag' to the completion queue bound to the call.
@@ -750,20 +780,20 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) {
The order of ops specified in the batch has no significance.
Only one operation of each type can be active at once in any given
batch */
-static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
- VALUE timeout, VALUE ops_hash) {
+static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
run_batch_stack st;
- grpc_call *call = NULL;
+ grpc_rb_call *call = NULL;
grpc_event ev;
grpc_call_error err;
VALUE result = Qnil;
VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
unsigned write_flag = 0;
+ void *tag = (void*)&st;
if (RTYPEDDATA_DATA(self) == NULL) {
rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
return Qnil;
}
- TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
+ TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
/* Validate the ops args, adding them to a ruby array */
if (TYPE(ops_hash) != T_HASH) {
@@ -778,7 +808,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
/* call grpc_call_start_batch, then wait for it to complete using
* pluck_event */
- err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag), NULL);
+ err = grpc_call_start_batch(call->wrapped, st.ops, st.op_num, tag, NULL);
if (err != GRPC_CALL_OK) {
grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError,
@@ -786,13 +816,11 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_call_error_detail_of(err), err);
return Qnil;
}
- ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout);
- if (ev.type == GRPC_QUEUE_TIMEOUT) {
- grpc_run_batch_stack_cleanup(&st);
- rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out");
- return Qnil;
+ ev = rb_completion_queue_pluck(call->queue, tag,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ if (!ev.success) {
+ rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
}
-
/* Build and return the BatchResult struct result,
if there is an error, it's reflected in the status */
result = grpc_run_batch_stack_build_result(&st);
@@ -883,6 +911,12 @@ static void Init_grpc_op_codes() {
UINT2NUM(GRPC_OP_RECV_CLOSE_ON_SERVER));
}
+static void Init_grpc_metadata_keys() {
+ VALUE grpc_rb_mMetadataKeys = rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys");
+ rb_define_const(grpc_rb_mMetadataKeys, "COMPRESSION_REQUEST_ALGORITHM",
+ rb_str_new2(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY));
+}
+
void Init_grpc_call() {
/* CallError inherits from Exception to signal that it is non-recoverable */
grpc_rb_eCallError =
@@ -900,7 +934,7 @@ void Init_grpc_call() {
1);
/* Add ruby analogues of the Call methods. */
- rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4);
+ rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1);
rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
@@ -909,6 +943,10 @@ void Init_grpc_call() {
rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
+ rb_define_method(grpc_rb_cCall, "trailing_metadata",
+ grpc_rb_call_get_trailing_metadata, 0);
+ rb_define_method(grpc_rb_cCall, "trailing_metadata=",
+ grpc_rb_call_set_trailing_metadata, 1);
rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
1);
@@ -917,13 +955,11 @@ void Init_grpc_call() {
/* Ids used to support call attributes */
id_metadata = rb_intern("metadata");
+ id_trailing_metadata = rb_intern("trailing_metadata");
id_status = rb_intern("status");
id_write_flag = rb_intern("write_flag");
/* Ids used by the c wrapping internals. */
- id_cq = rb_intern("__cq");
- id_flags = rb_intern("__flags");
- id_input_md = rb_intern("__input_md");
id_credentials = rb_intern("__credentials");
/* Ids used in constructing the batch result. */
@@ -943,19 +979,24 @@ void Init_grpc_call() {
Init_grpc_error_codes();
Init_grpc_op_codes();
Init_grpc_write_flags();
+ Init_grpc_metadata_keys();
}
/* Gets the call from the ruby object */
grpc_call *grpc_rb_get_wrapped_call(VALUE v) {
- grpc_call *c = NULL;
- TypedData_Get_Struct(v, grpc_call, &grpc_call_data_type, c);
- return c;
+ grpc_rb_call *call = NULL;
+ TypedData_Get_Struct(v, grpc_rb_call, &grpc_call_data_type, call);
+ return call->wrapped;
}
/* Obtains the wrapped object for a given call */
-VALUE grpc_rb_wrap_call(grpc_call *c) {
- if (c == NULL) {
+VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q) {
+ grpc_rb_call *wrapper;
+ if (c == NULL || q == NULL) {
return Qnil;
}
- return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, c);
+ wrapper = ALLOC(grpc_rb_call);
+ wrapper->wrapped = c;
+ wrapper->queue = q;
+ return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper);
}
diff --git a/src/ruby/ext/grpc/rb_call.h b/src/ruby/ext/grpc/rb_call.h
index 24adb3477b..56becdc5a4 100644
--- a/src/ruby/ext/grpc/rb_call.h
+++ b/src/ruby/ext/grpc/rb_call.h
@@ -42,7 +42,7 @@
grpc_call* grpc_rb_get_wrapped_call(VALUE v);
/* Gets the VALUE corresponding to given grpc_call. */
-VALUE grpc_rb_wrap_call(grpc_call* c);
+VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q);
/* Provides the details of an call error */
const char* grpc_call_error_detail_of(grpc_call_error err);
diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c
index 79ca5b32ce..9b6675da84 100644
--- a/src/ruby/ext/grpc/rb_call_credentials.c
+++ b/src/ruby/ext/grpc/rb_call_credentials.c
@@ -211,35 +211,6 @@ VALUE grpc_rb_wrap_call_credentials(grpc_call_credentials *c, VALUE mark) {
return rb_wrapper;
}
-/* Clones CallCredentials instances.
- Gives CallCredentials a consistent implementation of Ruby's object copy/dup
- protocol. */
-static VALUE grpc_rb_call_credentials_init_copy(VALUE copy, VALUE orig) {
- grpc_rb_call_credentials *orig_cred = NULL;
- grpc_rb_call_credentials *copy_cred = NULL;
-
- if (copy == orig) {
- return copy;
- }
-
- /* Raise an error if orig is not a credentials object or a subclass. */
- if (TYPE(orig) != T_DATA ||
- RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_call_credentials_free) {
- rb_raise(rb_eTypeError, "not a %s",
- rb_obj_classname(grpc_rb_cCallCredentials));
- }
-
- TypedData_Get_Struct(orig, grpc_rb_call_credentials,
- &grpc_rb_call_credentials_data_type, orig_cred);
- TypedData_Get_Struct(copy, grpc_rb_call_credentials,
- &grpc_rb_call_credentials_data_type, copy_cred);
-
- /* use ruby's MEMCPY to make a byte-for-byte copy of the credentials
- * wrapper object. */
- MEMCPY(copy_cred, orig_cred, grpc_rb_call_credentials, 1);
- return copy;
-}
-
/* The attribute used on the mark object to hold the callback */
static ID id_callback;
@@ -308,7 +279,7 @@ void Init_grpc_call_credentials() {
rb_define_method(grpc_rb_cCallCredentials, "initialize",
grpc_rb_call_credentials_init, 1);
rb_define_method(grpc_rb_cCallCredentials, "initialize_copy",
- grpc_rb_call_credentials_init_copy, 1);
+ grpc_rb_cannot_init_copy, 1);
rb_define_method(grpc_rb_cCallCredentials, "compose",
grpc_rb_call_credentials_compose, -1);
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 6943c93d4a..18a15d0125 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -39,6 +39,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "rb_grpc.h"
#include "rb_call.h"
#include "rb_channel_args.h"
@@ -55,11 +56,6 @@ static ID id_channel;
* GCed before the channel */
static ID id_target;
-/* id_cqueue is the name of the hidden ivar that preserves a reference to the
- * completion queue used to create the call, preserved so that it does not get
- * GCed before the channel */
-static ID id_cqueue;
-
/* id_insecure_channel is used to indicate that a channel is insecure */
static VALUE id_insecure_channel;
@@ -231,40 +227,11 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self,
return Qnil;
}
-/* Clones Channel instances.
-
- Gives Channel a consistent implementation of Ruby's object copy/dup
- protocol. */
-static VALUE grpc_rb_channel_init_copy(VALUE copy, VALUE orig) {
- grpc_rb_channel *orig_ch = NULL;
- grpc_rb_channel *copy_ch = NULL;
-
- if (copy == orig) {
- return copy;
- }
-
- /* Raise an error if orig is not a channel object or a subclass. */
- if (TYPE(orig) != T_DATA ||
- RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_channel_free) {
- rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(grpc_rb_cChannel));
- return Qnil;
- }
-
- TypedData_Get_Struct(orig, grpc_rb_channel, &grpc_channel_data_type, orig_ch);
- TypedData_Get_Struct(copy, grpc_rb_channel, &grpc_channel_data_type, copy_ch);
-
- /* use ruby's MEMCPY to make a byte-for-byte copy of the channel wrapper
- * object. */
- MEMCPY(copy_ch, orig_ch, grpc_rb_channel, 1);
- return copy;
-}
-
/* Create a call given a grpc_channel, in order to call method. The request
is not sent until grpc_call_invoke is called. */
-static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
- VALUE parent, VALUE mask,
- VALUE method, VALUE host,
- VALUE deadline) {
+static VALUE grpc_rb_channel_create_call(VALUE self, VALUE parent,
+ VALUE mask, VALUE method,
+ VALUE host, VALUE deadline) {
VALUE res = Qnil;
grpc_rb_channel *wrapper = NULL;
grpc_call *call = NULL;
@@ -284,7 +251,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
parent_call = grpc_rb_get_wrapped_call(parent);
}
- cq = grpc_rb_get_wrapped_completion_queue(cqueue);
+ cq = grpc_completion_queue_create(NULL);
TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper);
ch = wrapper->wrapped;
if (ch == NULL) {
@@ -301,15 +268,11 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
method_chars);
return Qnil;
}
- res = grpc_rb_wrap_call(call);
+ res = grpc_rb_wrap_call(call, cq);
/* Make this channel an instance attribute of the call so that it is not GCed
* before the call. */
rb_ivar_set(res, id_channel, self);
-
- /* Make the completion queue an instance attribute of the call so that it is
- * not GCed before the call. */
- rb_ivar_set(res, id_cqueue, cqueue);
return res;
}
@@ -387,7 +350,7 @@ void Init_grpc_channel() {
/* Provides a ruby constructor and support for dup/clone. */
rb_define_method(grpc_rb_cChannel, "initialize", grpc_rb_channel_init, -1);
rb_define_method(grpc_rb_cChannel, "initialize_copy",
- grpc_rb_channel_init_copy, 1);
+ grpc_rb_cannot_init_copy, 1);
/* Add ruby analogues of the Channel methods. */
rb_define_method(grpc_rb_cChannel, "connectivity_state",
@@ -396,13 +359,12 @@ void Init_grpc_channel() {
rb_define_method(grpc_rb_cChannel, "watch_connectivity_state",
grpc_rb_channel_watch_connectivity_state, 4);
rb_define_method(grpc_rb_cChannel, "create_call",
- grpc_rb_channel_create_call, 6);
+ grpc_rb_channel_create_call, 5);
rb_define_method(grpc_rb_cChannel, "target", grpc_rb_channel_get_target, 0);
rb_define_method(grpc_rb_cChannel, "destroy", grpc_rb_channel_destroy, 0);
rb_define_alias(grpc_rb_cChannel, "close", "destroy");
id_channel = rb_intern("__channel");
- id_cqueue = rb_intern("__cqueue");
id_target = rb_intern("__target");
rb_define_const(grpc_rb_cChannel, "SSL_TARGET",
ID2SYM(rb_intern(GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)));
diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c
index cbb23885aa..5b7aa3417e 100644
--- a/src/ruby/ext/grpc/rb_channel_credentials.c
+++ b/src/ruby/ext/grpc/rb_channel_credentials.c
@@ -126,36 +126,6 @@ VALUE grpc_rb_wrap_channel_credentials(grpc_channel_credentials *c, VALUE mark)
return rb_wrapper;
}
-/* Clones ChannelCredentials instances.
- Gives ChannelCredentials a consistent implementation of Ruby's object copy/dup
- protocol. */
-static VALUE grpc_rb_channel_credentials_init_copy(VALUE copy, VALUE orig) {
- grpc_rb_channel_credentials *orig_cred = NULL;
- grpc_rb_channel_credentials *copy_cred = NULL;
-
- if (copy == orig) {
- return copy;
- }
-
- /* Raise an error if orig is not a credentials object or a subclass. */
- if (TYPE(orig) != T_DATA ||
- RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_channel_credentials_free) {
- rb_raise(rb_eTypeError, "not a %s",
- rb_obj_classname(grpc_rb_cChannelCredentials));
- }
-
- TypedData_Get_Struct(orig, grpc_rb_channel_credentials,
- &grpc_rb_channel_credentials_data_type, orig_cred);
- TypedData_Get_Struct(copy, grpc_rb_channel_credentials,
- &grpc_rb_channel_credentials_data_type, copy_cred);
-
- /* use ruby's MEMCPY to make a byte-for-byte copy of the credentials
- * wrapper object. */
- MEMCPY(copy_cred, orig_cred, grpc_rb_channel_credentials, 1);
- return copy;
-}
-
-
/* The attribute used on the mark object to hold the pem_root_certs. */
static ID id_pem_root_certs;
@@ -271,7 +241,7 @@ void Init_grpc_channel_credentials() {
rb_define_method(grpc_rb_cChannelCredentials, "initialize",
grpc_rb_channel_credentials_init, -1);
rb_define_method(grpc_rb_cChannelCredentials, "initialize_copy",
- grpc_rb_channel_credentials_init_copy, 1);
+ grpc_rb_cannot_init_copy, 1);
rb_define_method(grpc_rb_cChannelCredentials, "compose",
grpc_rb_channel_credentials_compose, -1);
rb_define_module_function(grpc_rb_cChannelCredentials,
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 9466402db0..fd75d2f691 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -40,12 +40,9 @@
#include <grpc/grpc.h>
#include <grpc/support/time.h>
+#include <grpc/support/log.h>
#include "rb_grpc.h"
-/* grpc_rb_cCompletionQueue is the ruby class that proxies
- * grpc_completion_queue. */
-static VALUE grpc_rb_cCompletionQueue = Qnil;
-
/* Used to allow grpc_completion_queue_next call to release the GIL */
typedef struct next_call_stack {
grpc_completion_queue *cq;
@@ -55,23 +52,6 @@ typedef struct next_call_stack {
volatile int interrupted;
} next_call_stack;
-/* Calls grpc_completion_queue_next without holding the ruby GIL */
-static void *grpc_rb_completion_queue_next_no_gil(void *param) {
- next_call_stack *const next_call = (next_call_stack*)param;
- gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
- gpr_timespec deadline;
- do {
- deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
- next_call->event = grpc_completion_queue_next(next_call->cq,
- deadline, NULL);
- if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
- gpr_time_cmp(deadline, next_call->timeout) > 0) {
- break;
- }
- } while (!next_call->interrupted);
- return NULL;
-}
-
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
@@ -90,107 +70,32 @@ static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
return NULL;
}
-/* Shuts down and drains the completion queue if necessary.
- *
- * This is done when the ruby completion queue object is about to be GCed.
- */
-static void grpc_rb_completion_queue_shutdown_drain(grpc_completion_queue *cq) {
- next_call_stack next_call;
- grpc_completion_type type;
- int drained = 0;
- MEMZERO(&next_call, next_call_stack, 1);
-
- grpc_completion_queue_shutdown(cq);
- next_call.cq = cq;
- next_call.event.type = GRPC_QUEUE_TIMEOUT;
- /* TODO: the timeout should be a module level constant that defaults
- * to gpr_inf_future(GPR_CLOCK_REALTIME).
- *
- * - at the moment this does not work, it stalls. Using a small timeout like
- * this one works, and leads to fast test run times; a longer timeout was
- * causing unnecessary delays in the test runs.
- *
- * - investigate further, this is probably another example of C-level cleanup
- * not working consistently in all cases.
- */
- next_call.timeout = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(5e3, GPR_TIMESPAN));
- do {
- rb_thread_call_without_gvl(grpc_rb_completion_queue_next_no_gil,
- (void *)&next_call, NULL, NULL);
- type = next_call.event.type;
- if (type == GRPC_QUEUE_TIMEOUT) break;
- if (type != GRPC_QUEUE_SHUTDOWN) {
- ++drained;
- rb_warning("completion queue shutdown: %d undrained events", drained);
- }
- } while (type != GRPC_QUEUE_SHUTDOWN);
-}
-
/* Helper function to free a completion queue. */
-static void grpc_rb_completion_queue_destroy(void *p) {
- grpc_completion_queue *cq = NULL;
- if (p == NULL) {
- return;
- }
- cq = (grpc_completion_queue *)p;
- grpc_rb_completion_queue_shutdown_drain(cq);
+void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq) {
+ /* Every function that adds an event to a queue also synchronously plucks
+ that event from the queue, and holds a reference to the Ruby object that
+ holds the queue, so we only get to this point if all of those functions
+ have completed, and the queue is empty */
+ grpc_completion_queue_shutdown(cq);
grpc_completion_queue_destroy(cq);
}
-static rb_data_type_t grpc_rb_completion_queue_data_type = {
- "grpc_completion_queue",
- {GRPC_RB_GC_NOT_MARKED, grpc_rb_completion_queue_destroy,
- GRPC_RB_MEMSIZE_UNAVAILABLE, {NULL, NULL}},
- NULL, NULL,
-#ifdef RUBY_TYPED_FREE_IMMEDIATELY
- /* cannot immediately free because grpc_rb_completion_queue_shutdown_drain
- * calls rb_thread_call_without_gvl. */
- 0,
-#endif
-};
-
-/* Releases the c-level resources associated with a completion queue */
-static VALUE grpc_rb_completion_queue_close(VALUE self) {
- grpc_completion_queue* cq = grpc_rb_get_wrapped_completion_queue(self);
- grpc_rb_completion_queue_destroy(cq);
- RTYPEDDATA_DATA(self) = NULL;
- return Qnil;
-}
-
-/* Allocates a completion queue. */
-static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
- grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
- if (cq == NULL) {
- rb_raise(rb_eArgError, "could not create a completion queue: not sure why");
- }
- return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
-}
-
static void unblock_func(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
next_call->interrupted = 1;
}
-/* Blocks until the next event for given tag is available, and returns the
- * event. */
-grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
- VALUE timeout) {
+/* Does the same thing as grpc_completion_queue_pluck, while properly releasing
+ the GVL and handling interrupts */
+grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag,
+ gpr_timespec deadline, void *reserved) {
next_call_stack next_call;
MEMZERO(&next_call, next_call_stack, 1);
- TypedData_Get_Struct(self, grpc_completion_queue,
- &grpc_rb_completion_queue_data_type, next_call.cq);
- if (TYPE(timeout) == T_NIL) {
- next_call.timeout = gpr_inf_future(GPR_CLOCK_REALTIME);
- } else {
- next_call.timeout = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
- }
- if (TYPE(tag) == T_NIL) {
- next_call.tag = NULL;
- } else {
- next_call.tag = ROBJECT(tag);
- }
+ next_call.cq = queue;
+ next_call.timeout = deadline;
+ next_call.tag = tag;
next_call.event.type = GRPC_QUEUE_TIMEOUT;
+ (void)reserved;
/* Loop until we finish a pluck without an interruption. The internal
pluck function runs either until it is interrupted or it gets an
event, or time runs out.
@@ -210,27 +115,3 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
next_call.event.type == GRPC_QUEUE_TIMEOUT);
return next_call.event;
}
-
-void Init_grpc_completion_queue() {
- grpc_rb_cCompletionQueue =
- rb_define_class_under(grpc_rb_mGrpcCore, "CompletionQueue", rb_cObject);
-
- /* constructor: uses an alloc func without an initializer. Using a simple
- alloc func works here as the grpc header does not specify any args for
- this func, so no separate initialization step is necessary. */
- rb_define_alloc_func(grpc_rb_cCompletionQueue,
- grpc_rb_completion_queue_alloc);
-
- /* close: Provides a way to close the underlying file descriptor without
- waiting for ruby garbage collection. */
- rb_define_method(grpc_rb_cCompletionQueue, "close",
- grpc_rb_completion_queue_close, 0);
-}
-
-/* Gets the wrapped completion queue from the ruby wrapper */
-grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v) {
- grpc_completion_queue *cq = NULL;
- TypedData_Get_Struct(v, grpc_completion_queue,
- &grpc_rb_completion_queue_data_type, cq);
- return cq;
-}
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 42de43c3fb..9f8f6aa5ff 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -41,15 +41,14 @@
/* Gets the wrapped completion queue from the ruby wrapper */
grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
+void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq);
+
/**
* Makes the implementation of CompletionQueue#pluck available in other files
*
* This avoids having code that holds the GIL repeated at multiple sites.
*/
-grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
- VALUE timeout);
-
-/* Initializes the CompletionQueue class. */
-void Init_grpc_completion_queue();
+grpc_event rb_completion_queue_pluck(grpc_completion_queue *queue, void *tag,
+ gpr_timespec deadline, void *reserved);
#endif /* GRPC_RB_COMPLETION_QUEUE_H_ */
diff --git a/src/ruby/ext/grpc/rb_compression_options.c b/src/ruby/ext/grpc/rb_compression_options.c
new file mode 100644
index 0000000000..0a3a215b1c
--- /dev/null
+++ b/src/ruby/ext/grpc/rb_compression_options.c
@@ -0,0 +1,464 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <ruby/ruby.h>
+
+#include "rb_compression_options.h"
+#include "rb_grpc_imports.generated.h"
+
+#include <grpc/compression.h>
+#include <grpc/grpc.h>
+#include <grpc/impl/codegen/alloc.h>
+#include <grpc/impl/codegen/compression_types.h>
+#include <grpc/impl/codegen/grpc_types.h>
+#include <string.h>
+
+#include "rb_grpc.h"
+
+static VALUE grpc_rb_cCompressionOptions = Qnil;
+
+/* Ruby Ids for the names of valid compression levels. */
+static VALUE id_compress_level_none = Qnil;
+static VALUE id_compress_level_low = Qnil;
+static VALUE id_compress_level_medium = Qnil;
+static VALUE id_compress_level_high = Qnil;
+
+/* grpc_rb_compression_options wraps a grpc_compression_options.
+ * It can be used to get the channel argument key-values for specific
+ * compression settings. */
+
+/* Note that ruby objects of this type don't carry any state in other
+ * Ruby objects and don't have a mark for GC. */
+typedef struct grpc_rb_compression_options {
+ /* The actual compression options that's being wrapped */
+ grpc_compression_options *wrapped;
+} grpc_rb_compression_options;
+
+/* Destroys the compression options instances and free the
+ * wrapped grpc compression options. */
+static void grpc_rb_compression_options_free(void *p) {
+ grpc_rb_compression_options *wrapper = NULL;
+ if (p == NULL) {
+ return;
+ };
+ wrapper = (grpc_rb_compression_options *)p;
+
+ if (wrapper->wrapped != NULL) {
+ gpr_free(wrapper->wrapped);
+ wrapper->wrapped = NULL;
+ }
+
+ xfree(p);
+}
+
+/* Ruby recognized data type for the CompressionOptions class. */
+static rb_data_type_t grpc_rb_compression_options_data_type = {
+ "grpc_compression_options",
+ {NULL,
+ grpc_rb_compression_options_free,
+ GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {NULL, NULL}},
+ NULL,
+ NULL,
+#ifdef RUBY_TYPED_FREE_IMMEDIATELY
+ RUBY_TYPED_FREE_IMMEDIATELY
+#endif
+};
+
+/* Allocates CompressionOptions instances.
+ Allocate the wrapped grpc compression options and
+ initialize it here too. */
+static VALUE grpc_rb_compression_options_alloc(VALUE cls) {
+ grpc_rb_compression_options *wrapper =
+ gpr_malloc(sizeof(grpc_rb_compression_options));
+ wrapper->wrapped = NULL;
+ wrapper->wrapped = gpr_malloc(sizeof(grpc_compression_options));
+ grpc_compression_options_init(wrapper->wrapped);
+
+ return TypedData_Wrap_Struct(cls, &grpc_rb_compression_options_data_type,
+ wrapper);
+}
+
+/* Disables a compression algorithm, given the GRPC core internal number of a
+ * compression algorithm. */
+VALUE grpc_rb_compression_options_disable_compression_algorithm_internal(
+ VALUE self, VALUE algorithm_to_disable) {
+ grpc_compression_algorithm compression_algorithm = 0;
+ grpc_rb_compression_options *wrapper = NULL;
+
+ TypedData_Get_Struct(self, grpc_rb_compression_options,
+ &grpc_rb_compression_options_data_type, wrapper);
+ compression_algorithm =
+ (grpc_compression_algorithm)NUM2INT(algorithm_to_disable);
+
+ grpc_compression_options_disable_algorithm(wrapper->wrapped,
+ compression_algorithm);
+
+ return Qnil;
+}
+
+/* Gets the compression internal enum value of a compression level given its
+ * name. */
+grpc_compression_level grpc_rb_compression_options_level_name_to_value_internal(
+ VALUE level_name) {
+ Check_Type(level_name, T_SYMBOL);
+
+ /* Check the compression level of the name passed in, and see which macro
+ * from the GRPC core header files match. */
+ if (id_compress_level_none == SYM2ID(level_name)) {
+ return GRPC_COMPRESS_LEVEL_NONE;
+ } else if (id_compress_level_low == SYM2ID(level_name)) {
+ return GRPC_COMPRESS_LEVEL_LOW;
+ } else if (id_compress_level_medium == SYM2ID(level_name)) {
+ return GRPC_COMPRESS_LEVEL_MED;
+ } else if (id_compress_level_high == SYM2ID(level_name)) {
+ return GRPC_COMPRESS_LEVEL_HIGH;
+ }
+
+ rb_raise(rb_eArgError,
+ "Unrecognized compression level name."
+ "Valid compression level names are none, low, medium, and high.");
+
+ /* Dummy return statement. */
+ return GRPC_COMPRESS_LEVEL_NONE;
+}
+
+/* Sets the default compression level, given the name of a compression level.
+ * Throws an error if no algorithm matched. */
+void grpc_rb_compression_options_set_default_level(
+ grpc_compression_options *options, VALUE new_level_name) {
+ options->default_level.level =
+ grpc_rb_compression_options_level_name_to_value_internal(new_level_name);
+ options->default_level.is_set = 1;
+}
+
+/* Gets the internal value of a compression algorithm suitable as the value
+ * in a GRPC core channel arguments hash.
+ * algorithm_value is an out parameter.
+ * Raises an error if the name of the algorithm passed in is invalid. */
+void grpc_rb_compression_options_algorithm_name_to_value_internal(
+ grpc_compression_algorithm *algorithm_value, VALUE algorithm_name) {
+ char *name_str = NULL;
+ long name_len = 0;
+ VALUE algorithm_name_as_string = Qnil;
+
+ Check_Type(algorithm_name, T_SYMBOL);
+
+ /* Convert the algorithm symbol to a ruby string, so that we can get the
+ * correct C string out of it. */
+ algorithm_name_as_string = rb_funcall(algorithm_name, rb_intern("to_s"), 0);
+
+ name_str = RSTRING_PTR(algorithm_name_as_string);
+ name_len = RSTRING_LEN(algorithm_name_as_string);
+
+ /* Raise an error if the name isn't recognized as a compression algorithm by
+ * the algorithm parse function
+ * in GRPC core. */
+ if (!grpc_compression_algorithm_parse(name_str, name_len, algorithm_value)) {
+ rb_raise(rb_eNameError, "Invalid compression algorithm name: %s",
+ StringValueCStr(algorithm_name_as_string));
+ }
+}
+
+/* Indicates whether a given algorithm is enabled on this instance, given the
+ * readable algorithm name. */
+VALUE grpc_rb_compression_options_is_algorithm_enabled(VALUE self,
+ VALUE algorithm_name) {
+ grpc_rb_compression_options *wrapper = NULL;
+ grpc_compression_algorithm internal_algorithm_value;
+
+ TypedData_Get_Struct(self, grpc_rb_compression_options,
+ &grpc_rb_compression_options_data_type, wrapper);
+ grpc_rb_compression_options_algorithm_name_to_value_internal(
+ &internal_algorithm_value, algorithm_name);
+
+ if (grpc_compression_options_is_algorithm_enabled(wrapper->wrapped,
+ internal_algorithm_value)) {
+ return Qtrue;
+ }
+ return Qfalse;
+}
+
+/* Sets the default algorithm to the name of the algorithm passed in.
+ * Raises an error if the name is not a valid compression algorithm name. */
+void grpc_rb_compression_options_set_default_algorithm(
+ grpc_compression_options *options, VALUE algorithm_name) {
+ grpc_rb_compression_options_algorithm_name_to_value_internal(
+ &options->default_algorithm.algorithm, algorithm_name);
+ options->default_algorithm.is_set = 1;
+}
+
+/* Disables an algorithm on the current instance, given the name of an
+ * algorithm.
+ * Fails if the algorithm name is invalid. */
+void grpc_rb_compression_options_disable_algorithm(
+ grpc_compression_options *compression_options, VALUE algorithm_name) {
+ grpc_compression_algorithm internal_algorithm_value;
+
+ grpc_rb_compression_options_algorithm_name_to_value_internal(
+ &internal_algorithm_value, algorithm_name);
+ grpc_compression_options_disable_algorithm(compression_options,
+ internal_algorithm_value);
+}
+
+/* Provides a ruby hash of GRPC core channel argument key-values that
+ * correspond to the compression settings on this instance. */
+VALUE grpc_rb_compression_options_to_hash(VALUE self) {
+ grpc_rb_compression_options *wrapper = NULL;
+ grpc_compression_options *compression_options = NULL;
+ VALUE channel_arg_hash = rb_hash_new();
+ VALUE key = Qnil;
+ VALUE value = Qnil;
+
+ TypedData_Get_Struct(self, grpc_rb_compression_options,
+ &grpc_rb_compression_options_data_type, wrapper);
+ compression_options = wrapper->wrapped;
+
+ /* Add key-value pairs to the new Ruby hash. It can be used
+ * as GRPC core channel arguments. */
+ if (compression_options->default_level.is_set) {
+ key = rb_str_new2(GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL);
+ value = INT2NUM((int)compression_options->default_level.level);
+ rb_hash_aset(channel_arg_hash, key, value);
+ }
+
+ if (compression_options->default_algorithm.is_set) {
+ key = rb_str_new2(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM);
+ value = INT2NUM((int)compression_options->default_algorithm.algorithm);
+ rb_hash_aset(channel_arg_hash, key, value);
+ }
+
+ key = rb_str_new2(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET);
+ value = INT2NUM((int)compression_options->enabled_algorithms_bitset);
+ rb_hash_aset(channel_arg_hash, key, value);
+
+ return channel_arg_hash;
+}
+
+/* Converts an internal enum level value to a readable level name.
+ * Fails if the level value is invalid. */
+VALUE grpc_rb_compression_options_level_value_to_name_internal(
+ grpc_compression_level compression_value) {
+ switch (compression_value) {
+ case GRPC_COMPRESS_LEVEL_NONE:
+ return ID2SYM(id_compress_level_none);
+ case GRPC_COMPRESS_LEVEL_LOW:
+ return ID2SYM(id_compress_level_low);
+ case GRPC_COMPRESS_LEVEL_MED:
+ return ID2SYM(id_compress_level_medium);
+ case GRPC_COMPRESS_LEVEL_HIGH:
+ return ID2SYM(id_compress_level_high);
+ default:
+ rb_raise(
+ rb_eArgError,
+ "Failed to convert compression level value to name for value: %d",
+ (int)compression_value);
+ }
+}
+
+/* Converts an algorithm internal enum value to a readable name.
+ * Fails if the enum value is invalid. */
+VALUE grpc_rb_compression_options_algorithm_value_to_name_internal(
+ grpc_compression_algorithm internal_value) {
+ char *algorithm_name = NULL;
+
+ if (!grpc_compression_algorithm_name(internal_value, &algorithm_name)) {
+ rb_raise(rb_eArgError, "Failed to convert algorithm value to name");
+ }
+
+ return ID2SYM(rb_intern(algorithm_name));
+}
+
+/* Gets the readable name of the default algorithm if one has been set.
+ * Returns nil if no algorithm has been set. */
+VALUE grpc_rb_compression_options_get_default_algorithm(VALUE self) {
+ grpc_compression_algorithm internal_value;
+ grpc_rb_compression_options *wrapper = NULL;
+
+ TypedData_Get_Struct(self, grpc_rb_compression_options,
+ &grpc_rb_compression_options_data_type, wrapper);
+
+ if (wrapper->wrapped->default_algorithm.is_set) {
+ internal_value = wrapper->wrapped->default_algorithm.algorithm;
+ return grpc_rb_compression_options_algorithm_value_to_name_internal(
+ internal_value);
+ }
+
+ return Qnil;
+}
+
+/* Gets the internal value of the default compression level that is to be passed
+ * to the GRPC core as a channel argument value.
+ * A nil return value means that it hasn't been set. */
+VALUE grpc_rb_compression_options_get_default_level(VALUE self) {
+ grpc_compression_level internal_value;
+ grpc_rb_compression_options *wrapper = NULL;
+
+ TypedData_Get_Struct(self, grpc_rb_compression_options,
+ &grpc_rb_compression_options_data_type, wrapper);
+
+ if (wrapper->wrapped->default_level.is_set) {
+ internal_value = wrapper->wrapped->default_level.level;
+ return grpc_rb_compression_options_level_value_to_name_internal(
+ internal_value);
+ }
+
+ return Qnil;
+}
+
+/* Gets a list of the disabled algorithms as readable names.
+ * Returns an empty list if no algorithms have been disabled. */
+VALUE grpc_rb_compression_options_get_disabled_algorithms(VALUE self) {
+ VALUE disabled_algorithms = rb_ary_new();
+ grpc_compression_algorithm internal_value;
+ grpc_rb_compression_options *wrapper = NULL;
+
+ TypedData_Get_Struct(self, grpc_rb_compression_options,
+ &grpc_rb_compression_options_data_type, wrapper);
+
+ for (internal_value = GRPC_COMPRESS_NONE;
+ internal_value < GRPC_COMPRESS_ALGORITHMS_COUNT; internal_value++) {
+ if (!grpc_compression_options_is_algorithm_enabled(wrapper->wrapped,
+ internal_value)) {
+ rb_ary_push(disabled_algorithms,
+ grpc_rb_compression_options_algorithm_value_to_name_internal(
+ internal_value));
+ }
+ }
+ return disabled_algorithms;
+}
+
+/* Initializes the compression options wrapper.
+ * Takes an optional hash parameter.
+ *
+ * Example call-seq:
+ * options = CompressionOptions.new(
+ * default_level: :none,
+ * disabled_algorithms: [:gzip]
+ * )
+ * channel_arg hash = Hash.new[...]
+ * channel_arg_hash_with_compression_options = channel_arg_hash.merge(options)
+ */
+VALUE grpc_rb_compression_options_init(int argc, VALUE *argv, VALUE self) {
+ grpc_rb_compression_options *wrapper = NULL;
+ VALUE default_algorithm = Qnil;
+ VALUE default_level = Qnil;
+ VALUE disabled_algorithms = Qnil;
+ VALUE algorithm_name = Qnil;
+ VALUE hash_arg = Qnil;
+
+ rb_scan_args(argc, argv, "01", &hash_arg);
+
+ /* Check if the hash parameter was passed, or if invalid arguments were
+ * passed. */
+ if (hash_arg == Qnil) {
+ return self;
+ } else if (TYPE(hash_arg) != T_HASH || argc > 1) {
+ rb_raise(rb_eArgError,
+ "Invalid arguments. Expecting optional hash parameter");
+ }
+
+ TypedData_Get_Struct(self, grpc_rb_compression_options,
+ &grpc_rb_compression_options_data_type, wrapper);
+
+ /* Set the default algorithm if one was chosen. */
+ default_algorithm =
+ rb_hash_aref(hash_arg, ID2SYM(rb_intern("default_algorithm")));
+ if (default_algorithm != Qnil) {
+ grpc_rb_compression_options_set_default_algorithm(wrapper->wrapped,
+ default_algorithm);
+ }
+
+ /* Set the default level if one was chosen. */
+ default_level = rb_hash_aref(hash_arg, ID2SYM(rb_intern("default_level")));
+ if (default_level != Qnil) {
+ grpc_rb_compression_options_set_default_level(wrapper->wrapped,
+ default_level);
+ }
+
+ /* Set the disabled algorithms if any were chosen. */
+ disabled_algorithms =
+ rb_hash_aref(hash_arg, ID2SYM(rb_intern("disabled_algorithms")));
+ if (disabled_algorithms != Qnil) {
+ Check_Type(disabled_algorithms, T_ARRAY);
+
+ for (int i = 0; i < RARRAY_LEN(disabled_algorithms); i++) {
+ algorithm_name = rb_ary_entry(disabled_algorithms, i);
+ grpc_rb_compression_options_disable_algorithm(wrapper->wrapped,
+ algorithm_name);
+ }
+ }
+
+ return self;
+}
+
+void Init_grpc_compression_options() {
+ grpc_rb_cCompressionOptions = rb_define_class_under(
+ grpc_rb_mGrpcCore, "CompressionOptions", rb_cObject);
+
+ /* Allocates an object managed by the ruby runtime. */
+ rb_define_alloc_func(grpc_rb_cCompressionOptions,
+ grpc_rb_compression_options_alloc);
+
+ /* Initializes the ruby wrapper. #new method takes an optional hash argument.
+ */
+ rb_define_method(grpc_rb_cCompressionOptions, "initialize",
+ grpc_rb_compression_options_init, -1);
+
+ /* Methods for getting the default algorithm, default level, and disabled
+ * algorithms as readable names. */
+ rb_define_method(grpc_rb_cCompressionOptions, "default_algorithm",
+ grpc_rb_compression_options_get_default_algorithm, 0);
+ rb_define_method(grpc_rb_cCompressionOptions, "default_level",
+ grpc_rb_compression_options_get_default_level, 0);
+ rb_define_method(grpc_rb_cCompressionOptions, "disabled_algorithms",
+ grpc_rb_compression_options_get_disabled_algorithms, 0);
+
+ /* Determines whether or not an algorithm is enabled, given a readable
+ * algorithm name.*/
+ rb_define_method(grpc_rb_cCompressionOptions, "algorithm_enabled?",
+ grpc_rb_compression_options_is_algorithm_enabled, 1);
+
+ /* Provides a hash of the compression settings suitable
+ * for passing to server or channel args. */
+ rb_define_method(grpc_rb_cCompressionOptions, "to_hash",
+ grpc_rb_compression_options_to_hash, 0);
+ rb_define_alias(grpc_rb_cCompressionOptions, "to_channel_arg_hash",
+ "to_hash");
+
+ /* Ruby ids for the names of the different compression levels. */
+ id_compress_level_none = rb_intern("none");
+ id_compress_level_low = rb_intern("low");
+ id_compress_level_medium = rb_intern("medium");
+ id_compress_level_high = rb_intern("high");
+}
diff --git a/src/ruby/ext/grpc/rb_compression_options.h b/src/ruby/ext/grpc/rb_compression_options.h
new file mode 100644
index 0000000000..4d5a924786
--- /dev/null
+++ b/src/ruby/ext/grpc/rb_compression_options.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_RB_COMPRESSION_OPTIONS_H_
+#define GRPC_RB_COMPRESSION_OPTIONS_H_
+
+#include <ruby/ruby.h>
+
+#include <grpc/grpc.h>
+
+/* Initializes the compression options ruby wrapper. */
+void Init_grpc_compression_options();
+
+#endif /* GRPC_RB_COMPRESSION_OPTIONS_H_ */
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 9246893f9f..17cd165a91 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -46,10 +46,10 @@
#include "rb_call_credentials.h"
#include "rb_channel.h"
#include "rb_channel_credentials.h"
-#include "rb_completion_queue.h"
#include "rb_loader.h"
#include "rb_server.h"
#include "rb_server_credentials.h"
+#include "rb_compression_options.h"
static VALUE grpc_rb_cTimeVal = Qnil;
@@ -85,7 +85,7 @@ VALUE grpc_rb_cannot_init(VALUE self) {
VALUE grpc_rb_cannot_init_copy(VALUE copy, VALUE self) {
(void)self;
rb_raise(rb_eTypeError,
- "initialization of %s only allowed from the gRPC native layer",
+ "Copy initialization of %s is not supported",
rb_obj_classname(copy));
return Qnil;
}
@@ -221,7 +221,7 @@ static VALUE grpc_rb_time_val_to_time(VALUE self) {
time_const);
real_time = gpr_convert_clock_type(*time_const, GPR_CLOCK_REALTIME);
return rb_funcall(rb_cTime, id_at, 2, INT2NUM(real_time.tv_sec),
- INT2NUM(real_time.tv_nsec));
+ INT2NUM(real_time.tv_nsec / 1000));
}
/* Invokes inspect on the ctime version of the time val. */
@@ -318,7 +318,7 @@ void Init_grpc_c() {
grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core");
grpc_rb_sNewServerRpc =
rb_struct_define("NewServerRpc", "method", "host",
- "deadline", "metadata", "call", "cq", NULL);
+ "deadline", "metadata", "call", NULL);
grpc_rb_sStatus =
rb_struct_define("Status", "code", "details", "metadata", NULL);
sym_code = ID2SYM(rb_intern("code"));
@@ -326,7 +326,6 @@ void Init_grpc_c() {
sym_metadata = ID2SYM(rb_intern("metadata"));
Init_grpc_channel();
- Init_grpc_completion_queue();
Init_grpc_call();
Init_grpc_call_credentials();
Init_grpc_channel_credentials();
@@ -334,4 +333,5 @@ void Init_grpc_c() {
Init_grpc_server_credentials();
Init_grpc_status_codes();
Init_grpc_time_consts();
+ Init_grpc_compression_options();
}
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 4d9707638f..9748cb576b 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -128,6 +128,7 @@ grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_from_fd_import;
grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import;
+grpc_use_signal_type grpc_use_signal_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -399,6 +400,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
grpc_insecure_channel_create_from_fd_import = (grpc_insecure_channel_create_from_fd_type) GetProcAddress(library, "grpc_insecure_channel_create_from_fd");
grpc_server_add_insecure_channel_from_fd_import = (grpc_server_add_insecure_channel_from_fd_type) GetProcAddress(library, "grpc_server_add_insecure_channel_from_fd");
+ grpc_use_signal_import = (grpc_use_signal_type) GetProcAddress(library, "grpc_use_signal");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index 072e29c232..6f0974e31b 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -335,6 +335,9 @@ extern grpc_insecure_channel_create_from_fd_type grpc_insecure_channel_create_fr
typedef void(*grpc_server_add_insecure_channel_from_fd_type)(grpc_server *server, grpc_completion_queue *cq, int fd);
extern grpc_server_add_insecure_channel_from_fd_type grpc_server_add_insecure_channel_from_fd_import;
#define grpc_server_add_insecure_channel_from_fd grpc_server_add_insecure_channel_from_fd_import
+typedef void(*grpc_use_signal_type)(int signum);
+extern grpc_use_signal_type grpc_use_signal_import;
+#define grpc_use_signal grpc_use_signal_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import
@@ -467,7 +470,7 @@ extern grpc_byte_buffer_length_type grpc_byte_buffer_length_import;
typedef void(*grpc_byte_buffer_destroy_type)(grpc_byte_buffer *byte_buffer);
extern grpc_byte_buffer_destroy_type grpc_byte_buffer_destroy_import;
#define grpc_byte_buffer_destroy grpc_byte_buffer_destroy_import
-typedef void(*grpc_byte_buffer_reader_init_type)(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer);
+typedef int(*grpc_byte_buffer_reader_init_type)(grpc_byte_buffer_reader *reader, grpc_byte_buffer *buffer);
extern grpc_byte_buffer_reader_init_type grpc_byte_buffer_reader_init_import;
#define grpc_byte_buffer_reader_init grpc_byte_buffer_reader_init_import
typedef void(*grpc_byte_buffer_reader_destroy_type)(grpc_byte_buffer_reader *reader);
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index f108b8acfc..2a6a246e67 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -38,6 +38,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/log.h>
#include "rb_call.h"
#include "rb_channel_args.h"
#include "rb_completion_queue.h"
@@ -53,53 +54,51 @@ static ID id_at;
/* id_insecure_server is used to indicate that a server is insecure */
static VALUE id_insecure_server;
-/* grpc_rb_server wraps a grpc_server. It provides a peer ruby object,
- 'mark' to minimize copying when a server is created from ruby. */
+/* grpc_rb_server wraps a grpc_server. */
typedef struct grpc_rb_server {
- /* Holder of ruby objects involved in constructing the server */
- VALUE mark;
/* The actual server */
grpc_server *wrapped;
grpc_completion_queue *queue;
} grpc_rb_server;
+static void destroy_server(grpc_rb_server *server, gpr_timespec deadline) {
+ grpc_event ev;
+ if (server->wrapped != NULL) {
+ grpc_server_shutdown_and_notify(server->wrapped, server->queue, NULL);
+ ev = rb_completion_queue_pluck(server->queue, NULL, deadline, NULL);
+ if (ev.type == GRPC_QUEUE_TIMEOUT) {
+ grpc_server_cancel_all_calls(server->wrapped);
+ rb_completion_queue_pluck(server->queue, NULL,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ }
+ grpc_server_destroy(server->wrapped);
+ grpc_rb_completion_queue_destroy(server->queue);
+ server->wrapped = NULL;
+ server->queue = NULL;
+ }
+}
+
/* Destroys server instances. */
static void grpc_rb_server_free(void *p) {
grpc_rb_server *svr = NULL;
+ gpr_timespec deadline;
if (p == NULL) {
return;
};
svr = (grpc_rb_server *)p;
- /* Deletes the wrapped object if the mark object is Qnil, which indicates
- that no other object is the actual owner. */
- /* grpc_server_shutdown does not exist. Change this to something that does
- or delete it */
- if (svr->wrapped != NULL && svr->mark == Qnil) {
- // grpc_server_shutdown(svr->wrapped);
- // Aborting to indicate a bug
- abort();
- grpc_server_destroy(svr->wrapped);
- }
+ deadline = gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_seconds(2, GPR_TIMESPAN));
- xfree(p);
-}
+ destroy_server(svr, deadline);
-/* Protects the mark object from GC */
-static void grpc_rb_server_mark(void *p) {
- grpc_rb_server *server = NULL;
- if (p == NULL) {
- return;
- }
- server = (grpc_rb_server *)p;
- if (server->mark != Qnil) {
- rb_gc_mark(server->mark);
- }
+ xfree(p);
}
static const rb_data_type_t grpc_rb_server_data_type = {
"grpc_server",
- {grpc_rb_server_mark, grpc_rb_server_free, GRPC_RB_MEMSIZE_UNAVAILABLE,
+ {GRPC_RB_GC_NOT_MARKED, grpc_rb_server_free, GRPC_RB_MEMSIZE_UNAVAILABLE,
{NULL, NULL}},
NULL,
NULL,
@@ -116,23 +115,20 @@ static const rb_data_type_t grpc_rb_server_data_type = {
static VALUE grpc_rb_server_alloc(VALUE cls) {
grpc_rb_server *wrapper = ALLOC(grpc_rb_server);
wrapper->wrapped = NULL;
- wrapper->mark = Qnil;
return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
}
/*
call-seq:
- cq = CompletionQueue.new
- server = Server.new(cq, {'arg1': 'value1'})
+ server = Server.new({'arg1': 'value1'})
Initializes server instances. */
-static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
- grpc_completion_queue *cq = NULL;
+static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
+ grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
grpc_rb_server *wrapper = NULL;
grpc_server *srv = NULL;
grpc_channel_args args;
MEMZERO(&args, grpc_channel_args, 1);
- cq = grpc_rb_get_wrapped_completion_queue(cqueue);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
wrapper);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
@@ -148,41 +144,9 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
wrapper->wrapped = srv;
wrapper->queue = cq;
- /* Add the cq as the server's mark object. This ensures the ruby cq can't be
- GCed before the server */
- wrapper->mark = cqueue;
return self;
}
-/* Clones Server instances.
-
- Gives Server a consistent implementation of Ruby's object copy/dup
- protocol. */
-static VALUE grpc_rb_server_init_copy(VALUE copy, VALUE orig) {
- grpc_rb_server *orig_srv = NULL;
- grpc_rb_server *copy_srv = NULL;
-
- if (copy == orig) {
- return copy;
- }
-
- /* Raise an error if orig is not a server object or a subclass. */
- if (TYPE(orig) != T_DATA ||
- RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_server_free) {
- rb_raise(rb_eTypeError, "not a %s", rb_obj_classname(grpc_rb_cServer));
- }
-
- TypedData_Get_Struct(orig, grpc_rb_server, &grpc_rb_server_data_type,
- orig_srv);
- TypedData_Get_Struct(copy, grpc_rb_server, &grpc_rb_server_data_type,
- copy_srv);
-
- /* use ruby's MEMCPY to make a byte-for-byte copy of the server wrapper
- object. */
- MEMCPY(copy_srv, orig_srv, grpc_rb_server, 1);
- return copy;
-}
-
/* request_call_stack holds various values used by the
* grpc_rb_server_request_call function */
typedef struct request_call_stack {
@@ -208,65 +172,57 @@ static void grpc_request_call_stack_cleanup(request_call_stack* st) {
}
/* call-seq:
- cq = CompletionQueue.new
- tag = Object.new
- timeout = 10
- server.request_call(cqueue, tag, timeout)
+ server.request_call
Requests notification of a new call on a server. */
-static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue,
- VALUE tag_new, VALUE timeout) {
+static VALUE grpc_rb_server_request_call(VALUE self) {
grpc_rb_server *s = NULL;
grpc_call *call = NULL;
grpc_event ev;
grpc_call_error err;
request_call_stack st;
VALUE result;
+ void *tag = (void*)&st;
+ grpc_completion_queue *call_queue = grpc_completion_queue_create(NULL);
gpr_timespec deadline;
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
if (s->wrapped == NULL) {
rb_raise(rb_eRuntimeError, "destroyed!");
return Qnil;
- } else {
- grpc_request_call_stack_init(&st);
- /* call grpc_server_request_call, then wait for it to complete using
- * pluck_event */
- err = grpc_server_request_call(
- s->wrapped, &call, &st.details, &st.md_ary,
- grpc_rb_get_wrapped_completion_queue(cqueue),
- grpc_rb_get_wrapped_completion_queue(s->mark),
- ROBJECT(tag_new));
- if (err != GRPC_CALL_OK) {
- grpc_request_call_stack_cleanup(&st);
- rb_raise(grpc_rb_eCallError,
- "grpc_server_request_call failed: %s (code=%d)",
- grpc_call_error_detail_of(err), err);
- return Qnil;
- }
-
- ev = grpc_rb_completion_queue_pluck_event(s->mark, tag_new, timeout);
- if (ev.type == GRPC_QUEUE_TIMEOUT) {
- grpc_request_call_stack_cleanup(&st);
- return Qnil;
- }
- if (!ev.success) {
- grpc_request_call_stack_cleanup(&st);
- rb_raise(grpc_rb_eCallError, "request_call completion failed");
- return Qnil;
- }
+ }
+ grpc_request_call_stack_init(&st);
+ /* call grpc_server_request_call, then wait for it to complete using
+ * pluck_event */
+ err = grpc_server_request_call(
+ s->wrapped, &call, &st.details, &st.md_ary,
+ call_queue, s->queue, tag);
+ if (err != GRPC_CALL_OK) {
+ grpc_request_call_stack_cleanup(&st);
+ rb_raise(grpc_rb_eCallError,
+ "grpc_server_request_call failed: %s (code=%d)",
+ grpc_call_error_detail_of(err), err);
+ return Qnil;
+ }
- /* build the NewServerRpc struct result */
- deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
- result = rb_struct_new(
- grpc_rb_sNewServerRpc, rb_str_new2(st.details.method),
- rb_str_new2(st.details.host),
- rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
- INT2NUM(deadline.tv_nsec)),
- grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call), cqueue, NULL);
+ ev = rb_completion_queue_pluck(s->queue, tag,
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
+ if (!ev.success) {
grpc_request_call_stack_cleanup(&st);
- return result;
+ rb_raise(grpc_rb_eCallError, "request_call completion failed");
+ return Qnil;
}
- return Qnil;
+
+ /* build the NewServerRpc struct result */
+ deadline = gpr_convert_clock_type(st.details.deadline, GPR_CLOCK_REALTIME);
+ result = rb_struct_new(
+ grpc_rb_sNewServerRpc, rb_str_new2(st.details.method),
+ rb_str_new2(st.details.host),
+ rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
+ INT2NUM(deadline.tv_nsec / 1000)),
+ grpc_rb_md_ary_to_h(&st.md_ary), grpc_rb_wrap_call(call, call_queue),
+ NULL);
+ grpc_request_call_stack_cleanup(&st);
+ return result;
}
static VALUE grpc_rb_server_start(VALUE self) {
@@ -282,41 +238,33 @@ static VALUE grpc_rb_server_start(VALUE self) {
/*
call-seq:
- cq = CompletionQueue.new
- server = Server.new(cq, {'arg1': 'value1'})
+ server = Server.new({'arg1': 'value1'})
... // do stuff with server
...
... // to shutdown the server
- server.destroy(cq)
+ server.destroy()
... // to shutdown the server with a timeout
- server.destroy(cq, timeout)
+ server.destroy(timeout)
Destroys server instances. */
static VALUE grpc_rb_server_destroy(int argc, VALUE *argv, VALUE self) {
- VALUE cqueue = Qnil;
VALUE timeout = Qnil;
- grpc_completion_queue *cq = NULL;
- grpc_event ev;
+ gpr_timespec deadline;
grpc_rb_server *s = NULL;
- /* "11" == 1 mandatory args, 1 (timeout) is optional */
- rb_scan_args(argc, argv, "11", &cqueue, &timeout);
- cq = grpc_rb_get_wrapped_completion_queue(cqueue);
+ /* "01" == 0 mandatory args, 1 (timeout) is optional */
+ rb_scan_args(argc, argv, "01", &timeout);
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
-
- if (s->wrapped != NULL) {
- grpc_server_shutdown_and_notify(s->wrapped, cq, NULL);
- ev = grpc_rb_completion_queue_pluck_event(cqueue, Qnil, timeout);
- if (!ev.success) {
- rb_warn("server shutdown failed, cancelling the calls, objects may leak");
- grpc_server_cancel_all_calls(s->wrapped);
- return Qfalse;
- }
- grpc_server_destroy(s->wrapped);
- s->wrapped = NULL;
+ if (TYPE(timeout) == T_NIL) {
+ deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ } else {
+ deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
}
- return Qtrue;
+
+ destroy_server(s, deadline);
+
+ return Qnil;
}
/*
@@ -376,13 +324,13 @@ void Init_grpc_server() {
rb_define_alloc_func(grpc_rb_cServer, grpc_rb_server_alloc);
/* Provides a ruby constructor and support for dup/clone. */
- rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 2);
+ rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
rb_define_method(grpc_rb_cServer, "initialize_copy",
- grpc_rb_server_init_copy, 1);
+ grpc_rb_cannot_init_copy, 1);
/* Add the server methods. */
rb_define_method(grpc_rb_cServer, "request_call",
- grpc_rb_server_request_call, 3);
+ grpc_rb_server_request_call, 0);
rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, -1);
rb_define_alias(grpc_rb_cServer, "close", "destroy");
diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c
index 3b0fb6c910..a44ce715ae 100644
--- a/src/ruby/ext/grpc/rb_server_credentials.c
+++ b/src/ruby/ext/grpc/rb_server_credentials.c
@@ -38,6 +38,7 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/log.h>
#include "rb_grpc.h"
@@ -46,8 +47,8 @@
static VALUE grpc_rb_cServerCredentials = Qnil;
/* grpc_rb_server_credentials wraps a grpc_server_credentials. It provides a
- peer ruby object, 'mark' to minimize copying when a server credential is
- created from ruby. */
+ peer ruby object, 'mark' to hold references to objects involved in
+ constructing the server credentials. */
typedef struct grpc_rb_server_credentials {
/* Holder of ruby objects involved in constructing the server credentials */
VALUE mark;
@@ -111,36 +112,6 @@ static VALUE grpc_rb_server_credentials_alloc(VALUE cls) {
wrapper);
}
-/* Clones ServerCredentials instances.
-
- Gives ServerCredentials a consistent implementation of Ruby's object copy/dup
- protocol. */
-static VALUE grpc_rb_server_credentials_init_copy(VALUE copy, VALUE orig) {
- grpc_rb_server_credentials *orig_ch = NULL;
- grpc_rb_server_credentials *copy_ch = NULL;
-
- if (copy == orig) {
- return copy;
- }
-
- /* Raise an error if orig is not a server_credentials object or a subclass. */
- if (TYPE(orig) != T_DATA ||
- RDATA(orig)->dfree != (RUBY_DATA_FUNC)grpc_rb_server_credentials_free) {
- rb_raise(rb_eTypeError, "not a %s",
- rb_obj_classname(grpc_rb_cServerCredentials));
- }
-
- TypedData_Get_Struct(orig, grpc_rb_server_credentials,
- &grpc_rb_server_credentials_data_type, orig_ch);
- TypedData_Get_Struct(copy, grpc_rb_server_credentials,
- &grpc_rb_server_credentials_data_type, copy_ch);
-
- /* use ruby's MEMCPY to make a byte-for-byte copy of the server_credentials
- wrapper object. */
- MEMCPY(copy_ch, orig_ch, grpc_rb_server_credentials, 1);
- return copy;
-}
-
/* The attribute used on the mark object to preserve the pem_root_certs. */
static ID id_pem_root_certs;
@@ -270,7 +241,7 @@ void Init_grpc_server_credentials() {
rb_define_method(grpc_rb_cServerCredentials, "initialize",
grpc_rb_server_credentials_init, 3);
rb_define_method(grpc_rb_cServerCredentials, "initialize_copy",
- grpc_rb_server_credentials_init_copy, 1);
+ grpc_rb_cannot_init_copy, 1);
id_pem_key_certs = rb_intern("__pem_key_certs");
id_pem_root_certs = rb_intern("__pem_root_certs");
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index b03ddbc193..4260d85437 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -43,8 +43,7 @@ class Struct
GRPC.logger.debug("Failing with status #{status}")
# raise BadStatus, propagating the metadata if present.
md = status.metadata
- with_sym_keys = Hash[md.each_pair.collect { |x, y| [x.to_sym, y] }]
- fail GRPC::BadStatus.new(status.code, status.details, with_sym_keys)
+ fail GRPC::BadStatus.new(status.code, status.details, md)
end
status
end
@@ -61,7 +60,7 @@ module GRPC
extend Forwardable
attr_reader(:deadline)
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
- :peer, :peer_cert
+ :peer, :peer_cert, :trailing_metadata
# client_invoke begins a client invocation.
#
@@ -75,17 +74,10 @@ module GRPC
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param call [Call] a call on which to start and invocation
- # @param q [CompletionQueue] the completion queue
# @param metadata [Hash] the metadata
- def self.client_invoke(call, q, metadata = {})
+ def self.client_invoke(call, metadata = {})
fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
- unless q.is_a? Core::CompletionQueue
- fail(TypeError, '!Core::CompletionQueue')
- end
- metadata_tag = Object.new
- call.run_batch(q, metadata_tag, INFINITE_FUTURE,
- SEND_INITIAL_METADATA => metadata)
- metadata_tag
+ call.run_batch(SEND_INITIAL_METADATA => metadata)
end
# Creates an ActiveCall.
@@ -102,26 +94,21 @@ module GRPC
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
- # @param q [CompletionQueue] the completion queue used to accept
- # the call. This queue will be closed on call completion.
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
- # @param metadata_tag [Object] the object use obtain metadata for clients
- # @param started [true|false] indicates if the call has begun
- def initialize(call, q, marshal, unmarshal, deadline, started: true,
- metadata_tag: nil)
+ # @param started [true|false] indicates that metadata was sent
+ # @param metadata_received [true|false] indicates if metadata has already
+ # been received. Should always be true for server calls
+ def initialize(call, marshal, unmarshal, deadline, started: true,
+ metadata_received: false)
fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
- unless q.is_a? Core::CompletionQueue
- fail(TypeError, '!Core::CompletionQueue')
- end
@call = call
- @cq = q
@deadline = deadline
@marshal = marshal
- @started = started
@unmarshal = unmarshal
- @metadata_tag = metadata_tag
+ @metadata_received = metadata_received
+ @metadata_sent = started
@op_notifier = nil
end
@@ -132,7 +119,7 @@ module GRPC
end
# cancelled indicates if the call was cancelled
- def cancelled
+ def cancelled?
!@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
end
@@ -168,8 +155,11 @@ module GRPC
SEND_CLOSE_FROM_CLIENT => nil
}
ops[RECV_STATUS_ON_CLIENT] = nil if assert_finished
- batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ batch_result = @call.run_batch(ops)
return unless assert_finished
+ unless batch_result.status.nil?
+ @call.trailing_metadata = batch_result.status.metadata
+ end
@call.status = batch_result.status
op_is_done
batch_result.check_status
@@ -179,20 +169,14 @@ module GRPC
#
# It blocks until the remote endpoint acknowledges by sending a status.
def finished
- batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE,
- RECV_STATUS_ON_CLIENT => nil)
+ batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
unless batch_result.status.nil?
- if @call.metadata.nil?
- @call.metadata = batch_result.status.metadata
- else
- @call.metadata.merge!(batch_result.status.metadata)
- end
+ @call.trailing_metadata = batch_result.status.metadata
end
@call.status = batch_result.status
op_is_done
batch_result.check_status
@call.close
- @cq.close
end
# remote_send sends a request to the remote endpoint.
@@ -203,9 +187,10 @@ module GRPC
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
+ # TODO(murgatroid99): ensure metadata was sent
GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
payload = marshalled ? req : @marshal.call(req)
- @call.run_batch(@cq, self, INFINITE_FUTURE, SEND_MESSAGE => payload)
+ @call.run_batch(SEND_MESSAGE => payload)
end
# send_status sends a status to the remote endpoint.
@@ -222,7 +207,7 @@ module GRPC
SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata)
}
ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
- @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
+ @call.run_batch(ops)
nil
end
@@ -234,11 +219,11 @@ module GRPC
# raising BadStatus
def remote_read
ops = { RECV_MESSAGE => nil }
- ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
- batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
- unless @metadata_tag.nil?
+ ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
+ batch_result = @call.run_batch(ops)
+ unless @metadata_received
@call.metadata = batch_result.metadata
- @metadata_tag = nil
+ @metadata_received = true
end
GRPC.logger.debug("received req: #{batch_result}")
unless batch_result.nil? || batch_result.message.nil?
@@ -318,7 +303,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def request_response(req, metadata: {})
- start_call(metadata) unless @started
+ start_call(metadata)
remote_send(req)
writes_done(false)
response = remote_read
@@ -342,7 +327,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def client_streamer(requests, metadata: {})
- start_call(metadata) unless @started
+ start_call(metadata)
requests.each { |r| remote_send(r) }
writes_done(false)
response = remote_read
@@ -368,7 +353,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {})
- start_call(metadata) unless @started
+ start_call(metadata)
remote_send(req)
writes_done(false)
replies = enum_for(:each_remote_read_then_finish)
@@ -407,10 +392,9 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk)
- start_call(metadata) unless @started
- bd = BidiCall.new(@call, @cq, @marshal, @unmarshal,
- metadata_tag: @metadata_tag)
- @metadata_tag = nil # run_on_client ensures metadata is read
+ start_call(metadata)
+ bd = BidiCall.new(@call, @marshal, @unmarshal,
+ metadata_received: @metadata_received)
bd.run_on_client(requests, @op_notifier, &blk)
end
@@ -426,7 +410,8 @@ module GRPC
#
# @param gen_each_reply [Proc] generates the BiDi stream replies
def run_server_bidi(gen_each_reply)
- bd = BidiCall.new(@call, @cq, @marshal, @unmarshal)
+ bd = BidiCall.new(@call, @marshal, @unmarshal,
+ metadata_received: @metadata_received)
bd.run_on_server(gen_each_reply)
end
@@ -449,9 +434,9 @@ module GRPC
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
def start_call(metadata = {})
- return if @started
- @metadata_tag = ActiveCall.client_invoke(@call, @cq, metadata)
- @started = true
+ return if @metadata_sent
+ @metadata_tag = ActiveCall.client_invoke(@call, metadata)
+ @metadata_sent = true
end
def self.view_class(*visible_methods)
@@ -468,18 +453,18 @@ module GRPC
# SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request.
- SingleReqView = view_class(:cancelled, :deadline, :metadata,
+ SingleReqView = view_class(:cancelled?, :deadline, :metadata,
:output_metadata, :peer, :peer_cert)
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
- MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
+ MultiReqView = view_class(:cancelled?, :deadline, :each_queued_msg,
:each_remote_read, :metadata, :output_metadata)
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
- Operation = view_class(:cancel, :cancelled, :deadline, :execute,
+ Operation = view_class(:cancel, :cancelled?, :deadline, :execute,
:metadata, :status, :start_call, :wait, :write_flag,
- :write_flag=)
+ :write_flag=, :trailing_metadata)
end
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index 238f409a1d..425dc3e519 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -52,23 +52,18 @@ module GRPC
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
- # @param q [CompletionQueue] the completion queue used to accept
- # the call
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
- # @param metadata_tag [Object] tag object used to collect metadata
- def initialize(call, q, marshal, unmarshal, metadata_tag: nil)
+ # @param metadata_received [true|false] indicates if metadata has already
+ # been received. Should always be true for server calls
+ def initialize(call, marshal, unmarshal, metadata_received: false)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
- unless q.is_a? Core::CompletionQueue
- fail(ArgumentError, 'not a CompletionQueue')
- end
@call = call
- @cq = q
@marshal = marshal
@op_notifier = nil # signals completion on clients
@readq = Queue.new
@unmarshal = unmarshal
- @metadata_tag = metadata_tag
+ @metadata_received = metadata_received
@reads_complete = false
@writes_complete = false
@complete = false
@@ -124,7 +119,6 @@ module GRPC
@done_mutex.synchronize do
return unless @reads_complete && @writes_complete && !@complete
@call.close
- @cq.close
@complete = true
end
end
@@ -132,11 +126,11 @@ module GRPC
# performs a read using @call.run_batch, ensures metadata is set up
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
- ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil?
- batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops)
- unless @metadata_tag.nil?
+ ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
+ batch_result = @call.run_batch(ops)
+ unless @metadata_received
@call.metadata = batch_result.metadata
- @metadata_tag = nil
+ @metadata_received = true
end
batch_result
end
@@ -161,20 +155,26 @@ module GRPC
def write_loop(requests, is_client: true)
GRPC.logger.debug('bidi-write-loop: starting')
- write_tag = Object.new
count = 0
requests.each do |req|
GRPC.logger.debug("bidi-write-loop: #{count}")
count += 1
payload = @marshal.call(req)
- @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_MESSAGE => payload)
+ # Fails if status already received
+ begin
+ @call.run_batch(SEND_MESSAGE => payload)
+ rescue GRPC::Core::CallError => e
+ # This is almost definitely caused by a status arriving while still
+ # writing. Don't re-throw the error
+ GRPC.logger.warn('bidi-write-loop: ended with error')
+ GRPC.logger.warn(e)
+ break
+ end
end
GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
- @call.run_batch(@cq, write_tag, INFINITE_FUTURE,
- SEND_CLOSE_FROM_CLIENT => nil)
+ @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
GRPC.logger.debug('bidi-write-loop: done')
notify_done
@writes_complete = true
@@ -195,7 +195,6 @@ module GRPC
Thread.new do
GRPC.logger.debug('bidi-read-loop: starting')
begin
- read_tag = Object.new
count = 0
# queue the initial read before beginning the loop
loop do
@@ -208,8 +207,7 @@ module GRPC
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
if is_client
- batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE,
- RECV_STATUS_ON_CLIENT => nil)
+ batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
@call.status = batch_result.status
batch_result.check_status
GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index cddca13d17..9d6bd3bf59 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -90,19 +90,16 @@ module GRPC
# when present, this is the default timeout used for calls
#
# @param host [String] the host the stub connects to
- # @param q [Core::CompletionQueue] used to wait for events - now deprecated
- # since each new active call gets its own separately
# @param creds [Core::ChannelCredentials|Symbol] the channel credentials, or
# :this_channel_is_insecure
# @param channel_override [Core::Channel] a pre-created channel
# @param timeout [Number] the default timeout to use in requests
# @param channel_args [Hash] the channel arguments
- def initialize(host, q, creds,
+ def initialize(host, creds,
channel_override: nil,
timeout: nil,
propagate_mask: nil,
channel_args: {})
- fail(TypeError, '!CompletionQueue') unless q.is_a?(Core::CompletionQueue)
@ch = ClientStub.setup_channel(channel_override, host, creds,
channel_args)
alt_host = channel_args[Core::Channel::SSL_TARGET]
@@ -441,15 +438,13 @@ module GRPC
deadline = from_relative_time(@timeout) if deadline.nil?
# Provide each new client call with its own completion queue
- call_queue = Core::CompletionQueue.new
- call = @ch.create_call(call_queue,
- parent, # parent call
+ call = @ch.create_call(parent, # parent call
@propagate_mask, # propagation options
method,
nil, # host use nil,
deadline)
call.set_credentials! credentials unless credentials.nil?
- ActiveCall.new(call, call_queue, marshal, unmarshal, deadline,
+ ActiveCall.new(call, marshal, unmarshal, deadline,
started: false)
end
end
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ab7333d133..c92a532a50 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -159,16 +159,6 @@ module GRPC
# Signal check period is 0.25s
SIGNAL_CHECK_PERIOD = 0.25
- # setup_cq is used by #initialize to constuct a Core::CompletionQueue from
- # its arguments.
- def self.setup_cq(alt_cq)
- return Core::CompletionQueue.new if alt_cq.nil?
- unless alt_cq.is_a? Core::CompletionQueue
- fail(TypeError, '!CompletionQueue')
- end
- alt_cq
- end
-
# setup_connect_md_proc is used by #initialize to validate the
# connect_md_proc.
def self.setup_connect_md_proc(a_proc)
@@ -191,10 +181,6 @@ module GRPC
# * pool_size: the size of the thread pool the server uses to run its
# threads
#
- # * completion_queue_override: when supplied, this will be used as the
- # completion_queue that the server uses to receive network events,
- # otherwise its creates a new instance itself
- #
# * creds: [GRPC::Core::ServerCredentials]
# the credentials used to secure the server
#
@@ -212,11 +198,9 @@ module GRPC
def initialize(pool_size:DEFAULT_POOL_SIZE,
max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
poll_period:DEFAULT_POLL_PERIOD,
- completion_queue_override:nil,
connect_md_proc:nil,
server_args:{})
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
- @cq = RpcServer.setup_cq(completion_queue_override)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@pool_size = pool_size
@@ -226,7 +210,7 @@ module GRPC
# running_state can take 4 values: :not_started, :running, :stopping, and
# :stopped. State transitions can only proceed in that order.
@running_state = :not_started
- @server = Core::Server.new(@cq, server_args)
+ @server = Core::Server.new(server_args)
end
# stops a running server
@@ -240,7 +224,7 @@ module GRPC
transition_running_state(:stopping)
end
deadline = from_relative_time(@poll_period)
- @server.close(@cq, deadline)
+ @server.close(deadline)
@pool.stop
end
@@ -355,7 +339,8 @@ module GRPC
return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
- c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
+ c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
+ metadata_received: true)
c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
nil
end
@@ -366,7 +351,8 @@ module GRPC
return an_rpc if rpc_descs.key?(mth)
GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
noop = proc { |x| x }
- c = ActiveCall.new(an_rpc.call, an_rpc.cq, noop, noop, an_rpc.deadline)
+ c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
+ metadata_received: true)
c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
nil
end
@@ -374,11 +360,9 @@ module GRPC
# handles calls to the server
def loop_handle_server_calls
fail 'not started' if running_state == :not_started
- loop_tag = Object.new
while running_state == :running
begin
- comp_queue = Core::CompletionQueue.new
- an_rpc = @server.request_call(comp_queue, loop_tag, INFINITE_FUTURE)
+ an_rpc = @server.request_call
break if (!an_rpc.nil?) && an_rpc.call.nil?
active_call = new_active_server_call(an_rpc)
unless active_call.nil?
@@ -410,15 +394,13 @@ module GRPC
return nil if an_rpc.nil? || an_rpc.call.nil?
# allow the metadata to be accessed from the call
- handle_call_tag = Object.new
an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
GRPC.logger.debug("call md is #{an_rpc.metadata}")
connect_md = nil
unless @connect_md_proc.nil?
connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
end
- an_rpc.call.run_batch(an_rpc.cq, handle_call_tag, INFINITE_FUTURE,
- SEND_INITIAL_METADATA => connect_md)
+ an_rpc.call.run_batch(SEND_INITIAL_METADATA => connect_md)
return nil unless available?(an_rpc)
return nil unless implemented?(an_rpc)
@@ -426,9 +408,9 @@ module GRPC
# Create the ActiveCall
GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
rpc_desc = rpc_descs[an_rpc.method.to_sym]
- c = ActiveCall.new(an_rpc.call, an_rpc.cq,
- rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
- an_rpc.deadline)
+ c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc,
+ rpc_desc.unmarshal_proc(:input), an_rpc.deadline,
+ metadata_received: true)
mth = an_rpc.method.to_sym
[c, mth]
end
diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb
index f30242ee80..7cb9f1cc99 100644
--- a/src/ruby/lib/grpc/generic/service.rb
+++ b/src/ruby/lib/grpc/generic/service.rb
@@ -168,7 +168,7 @@ module GRPC
# @param kw [KeywordArgs] the channel arguments, plus any optional
# args for configuring the client's channel
def initialize(host, creds, **kw)
- super(host, Core::CompletionQueue.new, creds, **kw)
+ super(host, creds, **kw)
end
# Used define_method to add a method for each rpc_desc. Each method
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index 01c8c5ac8f..6e62af94d4 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -29,5 +29,5 @@
# GRPC contains the General RPC module.
module GRPC
- VERSION = '0.15.0.dev'
+ VERSION = '1.1.0.dev'
end
diff --git a/src/ruby/pb/src/proto/grpc/testing/messages.rb b/src/ruby/pb/src/proto/grpc/testing/messages.rb
index 2bdfe0eade..e27ccd0dc0 100644
--- a/src/ruby/pb/src/proto/grpc/testing/messages.rb
+++ b/src/ruby/pb/src/proto/grpc/testing/messages.rb
@@ -4,6 +4,9 @@
require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
+ add_message "grpc.testing.BoolValue" do
+ optional :value, :bool, 1
+ end
add_message "grpc.testing.Payload" do
optional :type, :enum, 1, "grpc.testing.PayloadType"
optional :body, :bytes, 2
@@ -18,8 +21,9 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
optional :payload, :message, 3, "grpc.testing.Payload"
optional :fill_username, :bool, 4
optional :fill_oauth_scope, :bool, 5
- optional :response_compression, :enum, 6, "grpc.testing.CompressionType"
+ optional :response_compressed, :message, 6, "grpc.testing.BoolValue"
optional :response_status, :message, 7, "grpc.testing.EchoStatus"
+ optional :expect_compressed, :message, 8, "grpc.testing.BoolValue"
end
add_message "grpc.testing.SimpleResponse" do
optional :payload, :message, 1, "grpc.testing.Payload"
@@ -28,6 +32,7 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
end
add_message "grpc.testing.StreamingInputCallRequest" do
optional :payload, :message, 1, "grpc.testing.Payload"
+ optional :expect_compressed, :message, 2, "grpc.testing.BoolValue"
end
add_message "grpc.testing.StreamingInputCallResponse" do
optional :aggregated_payload_size, :int32, 1
@@ -35,12 +40,12 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "grpc.testing.ResponseParameters" do
optional :size, :int32, 1
optional :interval_us, :int32, 2
+ optional :compressed, :message, 3, "grpc.testing.BoolValue"
end
add_message "grpc.testing.StreamingOutputCallRequest" do
optional :response_type, :enum, 1, "grpc.testing.PayloadType"
repeated :response_parameters, :message, 2, "grpc.testing.ResponseParameters"
optional :payload, :message, 3, "grpc.testing.Payload"
- optional :response_compression, :enum, 6, "grpc.testing.CompressionType"
optional :response_status, :message, 7, "grpc.testing.EchoStatus"
end
add_message "grpc.testing.StreamingOutputCallResponse" do
@@ -55,18 +60,12 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
end
add_enum "grpc.testing.PayloadType" do
value :COMPRESSABLE, 0
- value :UNCOMPRESSABLE, 1
- value :RANDOM, 2
- end
- add_enum "grpc.testing.CompressionType" do
- value :NONE, 0
- value :GZIP, 1
- value :DEFLATE, 2
end
end
module Grpc
module Testing
+ BoolValue = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.BoolValue").msgclass
Payload = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Payload").msgclass
EchoStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EchoStatus").msgclass
SimpleRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleRequest").msgclass
@@ -79,6 +78,5 @@ module Grpc
ReconnectParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectParams").msgclass
ReconnectInfo = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectInfo").msgclass
PayloadType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadType").enummodule
- CompressionType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.CompressionType").enummodule
end
end
diff --git a/src/ruby/pb/test/client.rb b/src/ruby/pb/test/client.rb
index b6695482a2..4c6d441dcb 100755
--- a/src/ruby/pb/test/client.rb
+++ b/src/ruby/pb/test/client.rb
@@ -52,9 +52,9 @@ require_relative '../../lib/grpc'
require 'googleauth'
require 'google/protobuf'
-require_relative 'proto/empty'
-require_relative 'proto/messages'
-require_relative 'proto/test_services'
+require_relative '../src/proto/grpc/testing/empty'
+require_relative '../src/proto/grpc/testing/messages'
+require_relative '../src/proto/grpc/testing/test_services'
AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR
@@ -111,6 +111,18 @@ end
# creates a test stub that accesses host:port securely.
def create_stub(opts)
address = "#{opts.host}:#{opts.port}"
+
+ # Provide channel args that request compression by default
+ # for compression interop tests
+ if ['client_compressed_unary',
+ 'client_compressed_streaming'].include?(opts.test_case)
+ compression_options =
+ GRPC::Core::CompressionOptions.new(default_algorithm: :gzip)
+ compression_channel_args = compression_options.to_channel_arg_hash
+ else
+ compression_channel_args = {}
+ end
+
if opts.secure
creds = ssl_creds(opts.use_test_ca)
stub_opts = {
@@ -145,10 +157,15 @@ def create_stub(opts)
end
GRPC.logger.info("... connecting securely to #{address}")
+ stub_opts[:channel_args].merge!(compression_channel_args)
Grpc::Testing::TestService::Stub.new(address, creds, **stub_opts)
else
GRPC.logger.info("... connecting insecurely to #{address}")
- Grpc::Testing::TestService::Stub.new(address, :this_channel_is_insecure)
+ Grpc::Testing::TestService::Stub.new(
+ address,
+ :this_channel_is_insecure,
+ channel_args: compression_channel_args
+ )
end
end
@@ -197,10 +214,47 @@ class PingPongPlayer
end
end
+class BlockingEnumerator
+ include Grpc::Testing
+ include Grpc::Testing::PayloadType
+
+ def initialize(req_size, sleep_time)
+ @req_size = req_size
+ @sleep_time = sleep_time
+ end
+
+ def each_item
+ return enum_for(:each_item) unless block_given?
+ req_cls = StreamingOutputCallRequest
+ req = req_cls.new(payload: Payload.new(body: nulls(@req_size)))
+ yield req
+ # Sleep until after the deadline should have passed
+ sleep(@sleep_time)
+ end
+end
+
+# Intended to be used to wrap a call_op, and to adjust
+# the write flag of the call_op in between messages yielded to it.
+class WriteFlagSettingStreamingInputEnumerable
+ attr_accessor :call_op
+
+ def initialize(requests_and_write_flags)
+ @requests_and_write_flags = requests_and_write_flags
+ end
+
+ def each
+ @requests_and_write_flags.each do |request_and_flag|
+ @call_op.write_flag = request_and_flag[:write_flag]
+ yield request_and_flag[:request]
+ end
+ end
+end
+
# defines methods corresponding to each interop test case.
class NamedTests
include Grpc::Testing
include Grpc::Testing::PayloadType
+ include GRPC::Core::MetadataKeys
def initialize(stub, args)
@stub = stub
@@ -216,6 +270,48 @@ class NamedTests
perform_large_unary
end
+ def client_compressed_unary
+ # first request used also for the probe
+ req_size, wanted_response_size = 271_828, 314_159
+ expect_compressed = BoolValue.new(value: true)
+ payload = Payload.new(type: :COMPRESSABLE, body: nulls(req_size))
+ req = SimpleRequest.new(response_type: :COMPRESSABLE,
+ response_size: wanted_response_size,
+ payload: payload,
+ expect_compressed: expect_compressed)
+
+ # send a probe to see if CompressedResponse is supported on the server
+ send_probe_for_compressed_request_support do
+ request_uncompressed_args = {
+ COMPRESSION_REQUEST_ALGORITHM => 'identity'
+ }
+ @stub.unary_call(req, metadata: request_uncompressed_args)
+ end
+
+ # make a call with a compressed message
+ resp = @stub.unary_call(req)
+ assert('Expected second unary call with compression to work') do
+ resp.payload.body.length == wanted_response_size
+ end
+
+ # make a call with an uncompressed message
+ stub_options = {
+ COMPRESSION_REQUEST_ALGORITHM => 'identity'
+ }
+
+ req = SimpleRequest.new(
+ response_type: :COMPRESSABLE,
+ response_size: wanted_response_size,
+ payload: payload,
+ expect_compressed: BoolValue.new(value: false)
+ )
+
+ resp = @stub.unary_call(req, metadata: stub_options)
+ assert('Expected second unary call with compression to work') do
+ resp.payload.body.length == wanted_response_size
+ end
+ end
+
def service_account_creds
# ignore this test if the oauth options are not set
if @args.oauth_scope.nil?
@@ -290,6 +386,50 @@ class NamedTests
end
end
+ def client_compressed_streaming
+ # first request used also by the probe
+ first_request = StreamingInputCallRequest.new(
+ payload: Payload.new(type: :COMPRESSABLE, body: nulls(27_182)),
+ expect_compressed: BoolValue.new(value: true)
+ )
+
+ # send a probe to see if CompressedResponse is supported on the server
+ send_probe_for_compressed_request_support do
+ request_uncompressed_args = {
+ COMPRESSION_REQUEST_ALGORITHM => 'identity'
+ }
+ @stub.streaming_input_call([first_request],
+ metadata: request_uncompressed_args)
+ end
+
+ second_request = StreamingInputCallRequest.new(
+ payload: Payload.new(type: :COMPRESSABLE, body: nulls(45_904)),
+ expect_compressed: BoolValue.new(value: false)
+ )
+
+ # Create the requests messages and the corresponding write flags
+ # for each message
+ requests = WriteFlagSettingStreamingInputEnumerable.new([
+ { request: first_request,
+ write_flag: 0 },
+ { request: second_request,
+ write_flag: GRPC::Core::WriteFlags::NO_COMPRESS }
+ ])
+
+ # Create the call_op, pass it to the requests enumerable, and
+ # run the call
+ call_op = @stub.streaming_input_call(requests,
+ return_op: true)
+ requests.call_op = call_op
+ resp = call_op.execute
+
+ wanted_aggregate_size = 73_086
+
+ assert("#{__callee__}: aggregate payload size is incorrect") do
+ wanted_aggregate_size == resp.aggregated_payload_size
+ end
+ end
+
def server_streaming
msg_sizes = [31_415, 9, 2653, 58_979]
response_spec = msg_sizes.map { |s| ResponseParameters.new(size: s) }
@@ -315,11 +455,10 @@ class NamedTests
end
def timeout_on_sleeping_server
- msg_sizes = [[27_182, 31_415]]
- ppp = PingPongPlayer.new(msg_sizes)
- deadline = GRPC::Core::TimeConsts::from_relative_time(0.001)
- resps = @stub.full_duplex_call(ppp.each_item, deadline: deadline)
- resps.each { |r| ppp.queue.push(r) }
+ enum = BlockingEnumerator.new(27_182, 2)
+ deadline = GRPC::Core::TimeConsts::from_relative_time(1)
+ resps = @stub.full_duplex_call(enum.each_item, deadline: deadline)
+ resps.each { } # wait to receive each request (or timeout)
fail 'Should have raised GRPC::BadStatus(DEADLINE_EXCEEDED)'
rescue GRPC::BadStatus => e
assert("#{__callee__}: status was wrong") do
@@ -351,7 +490,7 @@ class NamedTests
op.execute
fail 'Should have raised GRPC:Cancelled'
rescue GRPC::Cancelled
- assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled }
+ assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? }
end
def cancel_after_first_response
@@ -362,7 +501,7 @@ class NamedTests
op.execute.each { |r| ppp.queue.push(r) }
fail 'Should have raised GRPC:Cancelled'
rescue GRPC::Cancelled
- assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled }
+ assert("#{__callee__}: call operation should be CANCELLED") { op.cancelled? }
op.wait
end
@@ -397,6 +536,29 @@ class NamedTests
end
resp
end
+
+ # Send probing message for compressed request on the server, to see
+ # if it's implemented.
+ def send_probe_for_compressed_request_support(&send_probe)
+ bad_status_occured = false
+
+ begin
+ send_probe.call
+ rescue GRPC::BadStatus => e
+ if e.code == GRPC::Core::StatusCodes::INVALID_ARGUMENT
+ bad_status_occured = true
+ else
+ fail AssertionError, "Bad status received but code is #{e.code}"
+ end
+ rescue Exception => e
+ fail AssertionError, "Expected BadStatus. Received: #{e.inspect}"
+ end
+
+ assert('CompressedRequest probe failed') do
+ bad_status_occured
+ end
+ end
+
end
# Args is used to hold the command line info.
diff --git a/src/ruby/pb/test/proto/empty.rb b/src/ruby/pb/test/proto/empty.rb
deleted file mode 100644
index 559adcc85e..0000000000
--- a/src/ruby/pb/test/proto/empty.rb
+++ /dev/null
@@ -1,15 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# source: test/proto/empty.proto
-
-require 'google/protobuf'
-
-Google::Protobuf::DescriptorPool.generated_pool.build do
- add_message "grpc.testing.Empty" do
- end
-end
-
-module Grpc
- module Testing
- Empty = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Empty").msgclass
- end
-end
diff --git a/src/ruby/pb/test/proto/messages.rb b/src/ruby/pb/test/proto/messages.rb
deleted file mode 100644
index 5222c9824a..0000000000
--- a/src/ruby/pb/test/proto/messages.rb
+++ /dev/null
@@ -1,80 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# source: test/proto/messages.proto
-
-require 'google/protobuf'
-
-Google::Protobuf::DescriptorPool.generated_pool.build do
- add_message "grpc.testing.Payload" do
- optional :type, :enum, 1, "grpc.testing.PayloadType"
- optional :body, :bytes, 2
- end
- add_message "grpc.testing.EchoStatus" do
- optional :code, :int32, 1
- optional :message, :string, 2
- end
- add_message "grpc.testing.SimpleRequest" do
- optional :response_type, :enum, 1, "grpc.testing.PayloadType"
- optional :response_size, :int32, 2
- optional :payload, :message, 3, "grpc.testing.Payload"
- optional :fill_username, :bool, 4
- optional :fill_oauth_scope, :bool, 5
- optional :response_compression, :enum, 6, "grpc.testing.CompressionType"
- optional :response_status, :message, 7, "grpc.testing.EchoStatus"
- end
- add_message "grpc.testing.SimpleResponse" do
- optional :payload, :message, 1, "grpc.testing.Payload"
- optional :username, :string, 2
- optional :oauth_scope, :string, 3
- end
- add_message "grpc.testing.StreamingInputCallRequest" do
- optional :payload, :message, 1, "grpc.testing.Payload"
- end
- add_message "grpc.testing.StreamingInputCallResponse" do
- optional :aggregated_payload_size, :int32, 1
- end
- add_message "grpc.testing.ResponseParameters" do
- optional :size, :int32, 1
- optional :interval_us, :int32, 2
- end
- add_message "grpc.testing.StreamingOutputCallRequest" do
- optional :response_type, :enum, 1, "grpc.testing.PayloadType"
- repeated :response_parameters, :message, 2, "grpc.testing.ResponseParameters"
- optional :payload, :message, 3, "grpc.testing.Payload"
- optional :response_compression, :enum, 6, "grpc.testing.CompressionType"
- optional :response_status, :message, 7, "grpc.testing.EchoStatus"
- end
- add_message "grpc.testing.StreamingOutputCallResponse" do
- optional :payload, :message, 1, "grpc.testing.Payload"
- end
- add_message "grpc.testing.ReconnectInfo" do
- optional :passed, :bool, 1
- repeated :backoff_ms, :int32, 2
- end
- add_enum "grpc.testing.PayloadType" do
- value :COMPRESSABLE, 0
- value :UNCOMPRESSABLE, 1
- value :RANDOM, 2
- end
- add_enum "grpc.testing.CompressionType" do
- value :NONE, 0
- value :GZIP, 1
- value :DEFLATE, 2
- end
-end
-
-module Grpc
- module Testing
- Payload = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Payload").msgclass
- EchoStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EchoStatus").msgclass
- SimpleRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleRequest").msgclass
- SimpleResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleResponse").msgclass
- StreamingInputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallRequest").msgclass
- StreamingInputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingInputCallResponse").msgclass
- ResponseParameters = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ResponseParameters").msgclass
- StreamingOutputCallRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallRequest").msgclass
- StreamingOutputCallResponse = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.StreamingOutputCallResponse").msgclass
- ReconnectInfo = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectInfo").msgclass
- PayloadType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadType").enummodule
- CompressionType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.CompressionType").enummodule
- end
-end
diff --git a/src/ruby/pb/test/proto/test.rb b/src/ruby/pb/test/proto/test.rb
deleted file mode 100644
index 100eb6505c..0000000000
--- a/src/ruby/pb/test/proto/test.rb
+++ /dev/null
@@ -1,14 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# source: test/proto/test.proto
-
-require 'google/protobuf'
-
-require 'test/proto/empty'
-require 'test/proto/messages'
-Google::Protobuf::DescriptorPool.generated_pool.build do
-end
-
-module Grpc
- module Testing
- end
-end
diff --git a/src/ruby/pb/test/proto/test_services.rb b/src/ruby/pb/test/proto/test_services.rb
deleted file mode 100644
index 9df9cc5860..0000000000
--- a/src/ruby/pb/test/proto/test_services.rb
+++ /dev/null
@@ -1,64 +0,0 @@
-# Generated by the protocol buffer compiler. DO NOT EDIT!
-# Source: test/proto/test.proto for package 'grpc.testing'
-
-require 'grpc'
-require 'test/proto/test'
-
-module Grpc
- module Testing
- module TestService
-
- # TODO: add proto service documentation here
- class Service
-
- include GRPC::GenericService
-
- self.marshal_class_method = :encode
- self.unmarshal_class_method = :decode
- self.service_name = 'grpc.testing.TestService'
-
- rpc :EmptyCall, Empty, Empty
- rpc :UnaryCall, SimpleRequest, SimpleResponse
- rpc :StreamingOutputCall, StreamingOutputCallRequest, stream(StreamingOutputCallResponse)
- rpc :StreamingInputCall, stream(StreamingInputCallRequest), StreamingInputCallResponse
- rpc :FullDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse)
- rpc :HalfDuplexCall, stream(StreamingOutputCallRequest), stream(StreamingOutputCallResponse)
- end
-
- Stub = Service.rpc_stub_class
- end
- module UnimplementedService
-
- # TODO: add proto service documentation here
- class Service
-
- include GRPC::GenericService
-
- self.marshal_class_method = :encode
- self.unmarshal_class_method = :decode
- self.service_name = 'grpc.testing.UnimplementedService'
-
- rpc :UnimplementedCall, Empty, Empty
- end
-
- Stub = Service.rpc_stub_class
- end
- module ReconnectService
-
- # TODO: add proto service documentation here
- class Service
-
- include GRPC::GenericService
-
- self.marshal_class_method = :encode
- self.unmarshal_class_method = :decode
- self.service_name = 'grpc.testing.ReconnectService'
-
- rpc :Start, Empty, Empty
- rpc :Stop, Empty, ReconnectInfo
- end
-
- Stub = Service.rpc_stub_class
- end
- end
-end
diff --git a/src/ruby/pb/test/server.rb b/src/ruby/pb/test/server.rb
index 914c7cc79d..11ee3d465d 100755
--- a/src/ruby/pb/test/server.rb
+++ b/src/ruby/pb/test/server.rb
@@ -50,9 +50,9 @@ require 'optparse'
require 'grpc'
-require 'test/proto/empty'
-require 'test/proto/messages'
-require 'test/proto/test_services'
+require_relative '../src/proto/grpc/testing/empty'
+require_relative '../src/proto/grpc/testing/messages'
+require_relative '../src/proto/grpc/testing/test_services'
# DebugIsTruncated extends the default Logger to truncate debug messages
class DebugIsTruncated < Logger
@@ -188,11 +188,13 @@ class TestTarget < Grpc::Testing::TestService::Service
begin
GRPC.logger.info('interop-server: started receiving')
reqs.each do |req|
- resp_size = req.response_parameters[0].size
- GRPC.logger.info("read a req, response size is #{resp_size}")
- resp = cls.new(payload: Payload.new(type: req.response_type,
- body: nulls(resp_size)))
- q.push(resp)
+ req.response_parameters.each do |params|
+ resp_size = params.size
+ GRPC.logger.info("read a req, response size is #{resp_size}")
+ resp = cls.new(payload: Payload.new(type: req.response_type,
+ body: nulls(resp_size)))
+ q.push(resp)
+ end
end
GRPC.logger.info('interop-server: finished receiving')
q.push(self)
diff --git a/src/ruby/qps/src/proto/grpc/testing/messages.rb b/src/ruby/qps/src/proto/grpc/testing/messages.rb
index 2bdfe0eade..e27ccd0dc0 100644
--- a/src/ruby/qps/src/proto/grpc/testing/messages.rb
+++ b/src/ruby/qps/src/proto/grpc/testing/messages.rb
@@ -4,6 +4,9 @@
require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
+ add_message "grpc.testing.BoolValue" do
+ optional :value, :bool, 1
+ end
add_message "grpc.testing.Payload" do
optional :type, :enum, 1, "grpc.testing.PayloadType"
optional :body, :bytes, 2
@@ -18,8 +21,9 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
optional :payload, :message, 3, "grpc.testing.Payload"
optional :fill_username, :bool, 4
optional :fill_oauth_scope, :bool, 5
- optional :response_compression, :enum, 6, "grpc.testing.CompressionType"
+ optional :response_compressed, :message, 6, "grpc.testing.BoolValue"
optional :response_status, :message, 7, "grpc.testing.EchoStatus"
+ optional :expect_compressed, :message, 8, "grpc.testing.BoolValue"
end
add_message "grpc.testing.SimpleResponse" do
optional :payload, :message, 1, "grpc.testing.Payload"
@@ -28,6 +32,7 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
end
add_message "grpc.testing.StreamingInputCallRequest" do
optional :payload, :message, 1, "grpc.testing.Payload"
+ optional :expect_compressed, :message, 2, "grpc.testing.BoolValue"
end
add_message "grpc.testing.StreamingInputCallResponse" do
optional :aggregated_payload_size, :int32, 1
@@ -35,12 +40,12 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "grpc.testing.ResponseParameters" do
optional :size, :int32, 1
optional :interval_us, :int32, 2
+ optional :compressed, :message, 3, "grpc.testing.BoolValue"
end
add_message "grpc.testing.StreamingOutputCallRequest" do
optional :response_type, :enum, 1, "grpc.testing.PayloadType"
repeated :response_parameters, :message, 2, "grpc.testing.ResponseParameters"
optional :payload, :message, 3, "grpc.testing.Payload"
- optional :response_compression, :enum, 6, "grpc.testing.CompressionType"
optional :response_status, :message, 7, "grpc.testing.EchoStatus"
end
add_message "grpc.testing.StreamingOutputCallResponse" do
@@ -55,18 +60,12 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
end
add_enum "grpc.testing.PayloadType" do
value :COMPRESSABLE, 0
- value :UNCOMPRESSABLE, 1
- value :RANDOM, 2
- end
- add_enum "grpc.testing.CompressionType" do
- value :NONE, 0
- value :GZIP, 1
- value :DEFLATE, 2
end
end
module Grpc
module Testing
+ BoolValue = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.BoolValue").msgclass
Payload = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.Payload").msgclass
EchoStatus = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.EchoStatus").msgclass
SimpleRequest = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.SimpleRequest").msgclass
@@ -79,6 +78,5 @@ module Grpc
ReconnectParams = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectParams").msgclass
ReconnectInfo = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.ReconnectInfo").msgclass
PayloadType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.PayloadType").enummodule
- CompressionType = Google::Protobuf::DescriptorPool.generated_pool.lookup("grpc.testing.CompressionType").enummodule
end
end
diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb
index ae3ce0748a..1c44b333de 100644
--- a/src/ruby/spec/call_spec.rb
+++ b/src/ruby/spec/call_spec.rb
@@ -96,7 +96,6 @@ describe GRPC::Core::CallOps do
end
describe GRPC::Core::Call do
- let(:client_queue) { GRPC::Core::CompletionQueue.new }
let(:test_tag) { Object.new }
let(:fake_host) { 'localhost:10101' }
@@ -154,7 +153,7 @@ describe GRPC::Core::Call do
end
def make_test_call
- @ch.create_call(client_queue, nil, nil, 'dummy_method', nil, deadline)
+ @ch.create_call(nil, nil, 'dummy_method', nil, deadline)
end
def deadline
diff --git a/src/ruby/spec/channel_spec.rb b/src/ruby/spec/channel_spec.rb
index 355f95c9d7..740eac631a 100644
--- a/src/ruby/spec/channel_spec.rb
+++ b/src/ruby/spec/channel_spec.rb
@@ -37,7 +37,6 @@ end
describe GRPC::Core::Channel do
let(:fake_host) { 'localhost:0' }
- let(:cq) { GRPC::Core::CompletionQueue.new }
def create_test_cert
GRPC::Core::ChannelCredentials.new(load_test_certs[0])
@@ -122,7 +121,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
- ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
+ ch.create_call(nil, nil, 'dummy_method', nil, deadline)
end
expect(&blk).to_not raise_error
end
@@ -133,7 +132,7 @@ describe GRPC::Core::Channel do
deadline = Time.now + 5
blk = proc do
- ch.create_call(cq, nil, nil, 'dummy_method', nil, deadline)
+ ch.create_call(nil, nil, 'dummy_method', nil, deadline)
end
expect(&blk).to raise_error(RuntimeError)
end
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index aedeca272d..d9df0b9ae2 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -34,27 +34,23 @@ include GRPC::Core
shared_context 'setup: tags' do
let(:sent_message) { 'sent message' }
let(:reply_text) { 'the reply' }
- before(:example) do
- @client_tag = Object.new
- @server_tag = Object.new
- end
def deadline
Time.now + 5
end
- def server_allows_client_to_proceed
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ def server_allows_client_to_proceed(metadata = {})
+ recvd_rpc = @server.request_call
expect(recvd_rpc).to_not eq nil
server_call = recvd_rpc.call
- ops = { CallOps::SEND_INITIAL_METADATA => {} }
- svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline, ops)
+ ops = { CallOps::SEND_INITIAL_METADATA => metadata }
+ svr_batch = server_call.run_batch(ops)
expect(svr_batch.send_metadata).to be true
server_call
end
def new_client_call
- @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
+ @ch.create_call(nil, nil, '/method', nil, deadline)
end
end
@@ -91,8 +87,7 @@ shared_examples 'basic GRPC message delivery is OK' do
CallOps::SEND_INITIAL_METADATA => {},
CallOps::SEND_MESSAGE => sent_message
}
- batch_result = call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ batch_result = call.run_batch(client_ops)
expect(batch_result.send_metadata).to be true
expect(batch_result.send_message).to be true
@@ -101,8 +96,7 @@ shared_examples 'basic GRPC message delivery is OK' do
server_ops = {
CallOps::RECV_MESSAGE => nil
}
- svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
- server_ops)
+ svr_batch = server_call.run_batch(server_ops)
expect(svr_batch.message).to eq(sent_message)
end
@@ -118,8 +112,7 @@ shared_examples 'basic GRPC message delivery is OK' do
CallOps::SEND_INITIAL_METADATA => {},
CallOps::SEND_MESSAGE => sent_message
}
- batch_result = call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ batch_result = call.run_batch(client_ops)
expect(batch_result.send_metadata).to be true
expect(batch_result.send_message).to be true
@@ -129,12 +122,50 @@ shared_examples 'basic GRPC message delivery is OK' do
CallOps::RECV_MESSAGE => nil,
CallOps::SEND_MESSAGE => reply_text
}
- svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
- server_ops)
+ svr_batch = server_call.run_batch(server_ops)
expect(svr_batch.message).to eq(sent_message)
expect(svr_batch.send_message).to be true
end
+ it 'compressed messages can be sent and received' do
+ call = new_client_call
+ server_call = nil
+ long_request_str = '0' * 2000
+ long_response_str = '1' * 2000
+ md = { 'grpc-internal-encoding-request' => 'gzip' }
+
+ server_thread = Thread.new do
+ server_call = server_allows_client_to_proceed(md)
+ end
+
+ client_ops = {
+ CallOps::SEND_INITIAL_METADATA => md,
+ CallOps::SEND_MESSAGE => long_request_str
+ }
+ batch_result = call.run_batch(client_ops)
+ expect(batch_result.send_metadata).to be true
+ expect(batch_result.send_message).to be true
+
+ # confirm the server can read the inbound message
+ server_thread.join
+ server_ops = {
+ CallOps::RECV_MESSAGE => nil,
+ CallOps::SEND_MESSAGE => long_response_str
+ }
+ svr_batch = server_call.run_batch(server_ops)
+ expect(svr_batch.message).to eq(long_request_str)
+ expect(svr_batch.send_message).to be true
+
+ client_ops = {
+ CallOps::SEND_CLOSE_FROM_CLIENT => nil,
+ CallOps::RECV_INITIAL_METADATA => nil,
+ CallOps::RECV_MESSAGE => nil
+ }
+ batch_result = call.run_batch(client_ops)
+ expect(batch_result.send_close).to be true
+ expect(batch_result.message).to eq long_response_str
+ end
+
it 'servers can ignore a client write and send a status' do
call = new_client_call
server_call = nil
@@ -147,8 +178,7 @@ shared_examples 'basic GRPC message delivery is OK' do
CallOps::SEND_INITIAL_METADATA => {},
CallOps::SEND_MESSAGE => sent_message
}
- batch_result = call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ batch_result = call.run_batch(client_ops)
expect(batch_result.send_metadata).to be true
expect(batch_result.send_message).to be true
@@ -158,8 +188,7 @@ shared_examples 'basic GRPC message delivery is OK' do
server_ops = {
CallOps::SEND_STATUS_FROM_SERVER => the_status
}
- svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
- server_ops)
+ svr_batch = server_call.run_batch(server_ops)
expect(svr_batch.message).to eq nil
expect(svr_batch.send_status).to be true
end
@@ -176,8 +205,7 @@ shared_examples 'basic GRPC message delivery is OK' do
CallOps::SEND_INITIAL_METADATA => {},
CallOps::SEND_MESSAGE => sent_message
}
- batch_result = call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ batch_result = call.run_batch(client_ops)
expect(batch_result.send_metadata).to be true
expect(batch_result.send_message).to be true
@@ -189,8 +217,7 @@ shared_examples 'basic GRPC message delivery is OK' do
CallOps::SEND_MESSAGE => reply_text,
CallOps::SEND_STATUS_FROM_SERVER => the_status
}
- svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
- server_ops)
+ svr_batch = server_call.run_batch(server_ops)
expect(svr_batch.message).to eq sent_message
expect(svr_batch.send_status).to be true
expect(svr_batch.send_message).to be true
@@ -202,8 +229,7 @@ shared_examples 'basic GRPC message delivery is OK' do
CallOps::RECV_MESSAGE => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil
}
- batch_result = call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ batch_result = call.run_batch(client_ops)
expect(batch_result.send_close).to be true
expect(batch_result.message).to eq reply_text
expect(batch_result.status).to eq the_status
@@ -212,8 +238,7 @@ shared_examples 'basic GRPC message delivery is OK' do
server_ops = {
CallOps::RECV_CLOSE_ON_SERVER => nil
}
- svr_batch = server_call.run_batch(@server_queue, @server_tag, deadline,
- server_ops)
+ svr_batch = server_call.run_batch(server_ops)
expect(svr_batch.send_close).to be true
end
end
@@ -244,8 +269,7 @@ shared_examples 'GRPC metadata delivery works OK' do
CallOps::SEND_INITIAL_METADATA => md
}
blk = proc do
- call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ call.run_batch(client_ops)
end
expect(&blk).to raise_error
end
@@ -255,15 +279,14 @@ shared_examples 'GRPC metadata delivery works OK' do
@valid_metadata.each do |md|
recvd_rpc = nil
rcv_thread = Thread.new do
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_rpc = @server.request_call
end
call = new_client_call
client_ops = {
CallOps::SEND_INITIAL_METADATA => md
}
- batch_result = call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ batch_result = call.run_batch(client_ops)
expect(batch_result.send_metadata).to be true
# confirm the server can receive the client metadata
@@ -296,7 +319,7 @@ shared_examples 'GRPC metadata delivery works OK' do
@bad_keys.each do |md|
recvd_rpc = nil
rcv_thread = Thread.new do
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_rpc = @server.request_call
end
call = new_client_call
@@ -305,7 +328,7 @@ shared_examples 'GRPC metadata delivery works OK' do
client_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
- call.run_batch(@client_queue, @client_tag, deadline, client_ops)
+ call.run_batch(client_ops)
# server gets the invocation
rcv_thread.join
@@ -314,8 +337,7 @@ shared_examples 'GRPC metadata delivery works OK' do
CallOps::SEND_INITIAL_METADATA => md
}
blk = proc do
- recvd_rpc.call.run_batch(@server_queue, @server_tag, deadline,
- server_ops)
+ recvd_rpc.call.run_batch(server_ops)
end
expect(&blk).to raise_error
end
@@ -324,7 +346,7 @@ shared_examples 'GRPC metadata delivery works OK' do
it 'sends an empty hash if no metadata is added' do
recvd_rpc = nil
rcv_thread = Thread.new do
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_rpc = @server.request_call
end
call = new_client_call
@@ -333,7 +355,7 @@ shared_examples 'GRPC metadata delivery works OK' do
client_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
- call.run_batch(@client_queue, @client_tag, deadline, client_ops)
+ call.run_batch(client_ops)
# server gets the invocation but sends no metadata back
rcv_thread.join
@@ -342,14 +364,13 @@ shared_examples 'GRPC metadata delivery works OK' do
server_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
- server_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+ server_call.run_batch(server_ops)
# client receives nothing as expected
client_ops = {
CallOps::RECV_INITIAL_METADATA => nil
}
- batch_result = call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ batch_result = call.run_batch(client_ops)
expect(batch_result.metadata).to eq({})
end
@@ -357,7 +378,7 @@ shared_examples 'GRPC metadata delivery works OK' do
@valid_metadata.each do |md|
recvd_rpc = nil
rcv_thread = Thread.new do
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_rpc = @server.request_call
end
call = new_client_call
@@ -366,7 +387,7 @@ shared_examples 'GRPC metadata delivery works OK' do
client_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
- call.run_batch(@client_queue, @client_tag, deadline, client_ops)
+ call.run_batch(client_ops)
# server gets the invocation but sends no metadata back
rcv_thread.join
@@ -375,14 +396,13 @@ shared_examples 'GRPC metadata delivery works OK' do
server_ops = {
CallOps::SEND_INITIAL_METADATA => md
}
- server_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+ server_call.run_batch(server_ops)
# client receives nothing as expected
client_ops = {
CallOps::RECV_INITIAL_METADATA => nil
}
- batch_result = call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ batch_result = call.run_batch(client_ops)
replace_symbols = Hash[md.each_pair.collect { |x, y| [x.to_s, y] }]
expect(batch_result.metadata).to eq(replace_symbols)
end
@@ -393,9 +413,7 @@ end
describe 'the http client/server' do
before(:example) do
server_host = '0.0.0.0:0'
- @client_queue = GRPC::Core::CompletionQueue.new
- @server_queue = GRPC::Core::CompletionQueue.new
- @server = GRPC::Core::Server.new(@server_queue, nil)
+ @server = GRPC::Core::Server.new(nil)
server_port = @server.add_http2_port(server_host, :this_port_is_insecure)
@server.start
@ch = Channel.new("0.0.0.0:#{server_port}", nil, :this_channel_is_insecure)
@@ -403,7 +421,7 @@ describe 'the http client/server' do
after(:example) do
@ch.close
- @server.close(@server_queue, deadline)
+ @server.close(deadline)
end
it_behaves_like 'basic GRPC message delivery is OK' do
@@ -425,11 +443,9 @@ describe 'the secure http client/server' do
before(:example) do
certs = load_test_certs
server_host = '0.0.0.0:0'
- @client_queue = GRPC::Core::CompletionQueue.new
- @server_queue = GRPC::Core::CompletionQueue.new
server_creds = GRPC::Core::ServerCredentials.new(
nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
- @server = GRPC::Core::Server.new(@server_queue, nil)
+ @server = GRPC::Core::Server.new(nil)
server_port = @server.add_http2_port(server_host, server_creds)
@server.start
args = { Channel::SSL_TARGET => 'foo.test.google.fr' }
@@ -438,7 +454,7 @@ describe 'the secure http client/server' do
end
after(:example) do
- @server.close(@server_queue, deadline)
+ @server.close(deadline)
end
it_behaves_like 'basic GRPC message delivery is OK' do
@@ -454,7 +470,7 @@ describe 'the secure http client/server' do
expected_md = { 'k1' => 'updated-v1', 'k2' => 'v2' }
recvd_rpc = nil
rcv_thread = Thread.new do
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_rpc = @server.request_call
end
call = new_client_call
@@ -462,8 +478,7 @@ describe 'the secure http client/server' do
client_ops = {
CallOps::SEND_INITIAL_METADATA => md
}
- batch_result = call.run_batch(@client_queue, @client_tag, deadline,
- client_ops)
+ batch_result = call.run_batch(client_ops)
expect(batch_result.send_metadata).to be true
# confirm the server can receive the client metadata
diff --git a/src/ruby/spec/completion_queue_spec.rb b/src/ruby/spec/completion_queue_spec.rb
deleted file mode 100644
index 886a7f263b..0000000000
--- a/src/ruby/spec/completion_queue_spec.rb
+++ /dev/null
@@ -1,42 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-require 'grpc'
-
-describe GRPC::Core::CompletionQueue do
- before(:example) do
- @cq = GRPC::Core::CompletionQueue.new
- end
-
- describe '#new' do
- it 'is constructed successufully' do
- expect { GRPC::Core::CompletionQueue.new }.not_to raise_error
- end
- end
-end
diff --git a/src/ruby/spec/compression_options_spec.rb b/src/ruby/spec/compression_options_spec.rb
new file mode 100644
index 0000000000..dbd7e59294
--- /dev/null
+++ b/src/ruby/spec/compression_options_spec.rb
@@ -0,0 +1,164 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require 'grpc'
+
+describe GRPC::Core::CompressionOptions do
+ # Note these constants should be updated
+ # according to what the core lib provides.
+
+ # Names of supported compression algorithms
+ ALGORITHMS = [:identity, :deflate, :gzip]
+
+ # Names of valid supported compression levels
+ COMPRESS_LEVELS = [:none, :low, :medium, :high]
+
+ it 'implements to_s' do
+ expect { GRPC::Core::CompressionOptions.new.to_s }.to_not raise_error
+ end
+
+ it '#to_channel_arg_hash gives the same result as #to_hash' do
+ options = GRPC::Core::CompressionOptions.new
+ expect(options.to_channel_arg_hash).to eq(options.to_hash)
+ end
+
+ # Test the normal call sequence of creating an instance
+ # and then obtaining the resulting channel-arg hash that
+ # corresponds to the compression settings of the instance
+ describe 'creating, reading, and converting to channel args hash' do
+ it 'works when no optional args were provided' do
+ options = GRPC::Core::CompressionOptions.new
+
+ ALGORITHMS.each do |algorithm|
+ expect(options.algorithm_enabled?(algorithm)).to be true
+ end
+
+ expect(options.disabled_algorithms).to be_empty
+ expect(options.default_algorithm).to be nil
+ expect(options.default_level).to be nil
+ expect(options.to_hash).to be_instance_of(Hash)
+ end
+
+ it 'works when disabling multiple algorithms' do
+ options = GRPC::Core::CompressionOptions.new(
+ default_algorithm: :identity,
+ default_level: :none,
+ disabled_algorithms: [:gzip, :deflate]
+ )
+
+ [:gzip, :deflate].each do |algorithm|
+ expect(options.algorithm_enabled?(algorithm)).to be false
+ expect(options.disabled_algorithms.include?(algorithm)).to be true
+ end
+
+ expect(options.default_algorithm).to be(:identity)
+ expect(options.default_level).to be(:none)
+ expect(options.to_hash).to be_instance_of(Hash)
+ end
+
+ it 'works when all optional args have been set' do
+ options = GRPC::Core::CompressionOptions.new(
+ default_algorithm: :gzip,
+ default_level: :low,
+ disabled_algorithms: [:deflate]
+ )
+
+ expect(options.algorithm_enabled?(:deflate)).to be false
+ expect(options.algorithm_enabled?(:gzip)).to be true
+ expect(options.disabled_algorithms).to eq([:deflate])
+
+ expect(options.default_algorithm).to be(:gzip)
+ expect(options.default_level).to be(:low)
+ expect(options.to_hash).to be_instance_of(Hash)
+ end
+
+ it 'doesnt fail when no algorithms are disabled' do
+ options = GRPC::Core::CompressionOptions.new(
+ default_algorithm: :identity,
+ default_level: :high
+ )
+
+ ALGORITHMS.each do |algorithm|
+ expect(options.algorithm_enabled?(algorithm)).to be(true)
+ end
+
+ expect(options.disabled_algorithms).to be_empty
+ expect(options.default_algorithm).to be(:identity)
+ expect(options.default_level).to be(:high)
+ expect(options.to_hash).to be_instance_of(Hash)
+ end
+ end
+
+ describe '#new with bad parameters' do
+ it 'should fail with more than one parameter' do
+ blk = proc { GRPC::Core::CompressionOptions.new(:gzip, :none) }
+ expect { blk.call }.to raise_error
+ end
+
+ it 'should fail with a non-hash parameter' do
+ blk = proc { GRPC::Core::CompressionOptions.new(:gzip) }
+ expect { blk.call }.to raise_error
+ end
+ end
+
+ describe '#default_algorithm' do
+ it 'returns nil if unset' do
+ options = GRPC::Core::CompressionOptions.new
+ expect(options.default_algorithm).to be(nil)
+ end
+ end
+
+ describe '#default_level' do
+ it 'returns nil if unset' do
+ options = GRPC::Core::CompressionOptions.new
+ expect(options.default_level).to be(nil)
+ end
+ end
+
+ describe '#disabled_algorithms' do
+ it 'returns an empty list if no algorithms were disabled' do
+ options = GRPC::Core::CompressionOptions.new
+ expect(options.disabled_algorithms).to be_empty
+ end
+ end
+
+ describe '#algorithm_enabled?' do
+ [:none, :any, 'gzip', Object.new, 1].each do |name|
+ it "should fail for parameter ${name} of class #{name.class}" do
+ options = GRPC::Core::CompressionOptions.new(
+ disabled_algorithms: [:gzip])
+
+ blk = proc do
+ options.algorithm_enabled?(name)
+ end
+ expect { blk.call }.to raise_error
+ end
+ end
+ end
+end
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index d9c9780c93..018580e0df 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -39,13 +39,8 @@ describe GRPC::ActiveCall do
before(:each) do
@pass_through = proc { |x| x }
- @server_tag = Object.new
- @tag = Object.new
-
- @client_queue = GRPC::Core::CompletionQueue.new
- @server_queue = GRPC::Core::CompletionQueue.new
host = '0.0.0.0:0'
- @server = GRPC::Core::Server.new(@server_queue, nil)
+ @server = GRPC::Core::Server.new(nil)
server_port = @server.add_http2_port(host, :this_port_is_insecure)
@server.start
@ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil,
@@ -53,21 +48,20 @@ describe GRPC::ActiveCall do
end
after(:each) do
- @server.close(@server_queue, deadline)
+ @server.close(deadline)
end
describe 'restricted view methods' do
before(:each) do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- @client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ @client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
end
describe '#multi_req_view' do
it 'exposes a fixed subset of the ActiveCall methods' do
- want = %w(cancelled, deadline, each_remote_read, metadata, shutdown)
+ want = %w(cancelled?, deadline, each_remote_read, metadata, shutdown)
v = @client_call.multi_req_view
want.each do |w|
expect(v.methods.include?(w))
@@ -77,7 +71,7 @@ describe GRPC::ActiveCall do
describe '#single_req_view' do
it 'exposes a fixed subset of the ActiveCall methods' do
- want = %w(cancelled, deadline, metadata, shutdown)
+ want = %w(cancelled?, deadline, metadata, shutdown)
v = @client_call.single_req_view
want.each do |w|
expect(v.methods.include?(w))
@@ -89,46 +83,42 @@ describe GRPC::ActiveCall do
describe '#remote_send' do
it 'allows a client to send a payload to the server' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- @client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ @client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
@client_call.remote_send(msg)
# check that server rpc new was received
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_rpc = @server.request_call
expect(recvd_rpc).to_not eq nil
recvd_call = recvd_rpc.call
# Accept the call, and verify that the server reads the response ok.
- server_ops = {
- CallOps::SEND_INITIAL_METADATA => {}
- }
- recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
- server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
- @pass_through, deadline)
+ server_call = ActiveCall.new(recvd_call, @pass_through,
+ @pass_through, deadline,
+ metadata_received: true)
expect(server_call.remote_read).to eq(msg)
end
it 'marshals the payload using the marshal func' do
call = make_test_call
- ActiveCall.client_invoke(call, @client_queue)
+ ActiveCall.client_invoke(call)
marshal = proc { |x| 'marshalled:' + x }
- client_call = ActiveCall.new(call, @client_queue, marshal,
- @pass_through, deadline)
+ client_call = ActiveCall.new(call, marshal, @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
# confirm that the message was marshalled
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
server_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
- recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
- server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
- @pass_through, deadline)
+ recvd_call.run_batch(server_ops)
+ server_call = ActiveCall.new(recvd_call, @pass_through,
+ @pass_through, deadline,
+ metadata_received: true)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
@@ -136,23 +126,24 @@ describe GRPC::ActiveCall do
TEST_WRITE_FLAGS.each do |f|
it "successfully makes calls with write_flag set to #{f}" do
call = make_test_call
- ActiveCall.client_invoke(call, @client_queue)
+ ActiveCall.client_invoke(call)
marshal = proc { |x| 'marshalled:' + x }
- client_call = ActiveCall.new(call, @client_queue, marshal,
+ client_call = ActiveCall.new(call, marshal,
@pass_through, deadline)
msg = 'message is a string'
client_call.write_flag = f
client_call.remote_send(msg)
# confirm that the message was marshalled
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
server_ops = {
CallOps::SEND_INITIAL_METADATA => nil
}
- recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
- server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
- @pass_through, deadline)
+ recvd_call.run_batch(server_ops)
+ server_call = ActiveCall.new(recvd_call, @pass_through,
+ @pass_through, deadline,
+ metadata_received: true)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
end
@@ -162,8 +153,8 @@ describe GRPC::ActiveCall do
it 'sends metadata to the server when present' do
call = make_test_call
metadata = { k1: 'v1', k2: 'v2' }
- ActiveCall.client_invoke(call, @client_queue, metadata)
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ ActiveCall.client_invoke(call, metadata)
+ recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
expect(recvd_call).to_not be_nil
expect(recvd_rpc.metadata).to_not be_nil
@@ -175,10 +166,9 @@ describe GRPC::ActiveCall do
describe '#remote_read' do
it 'reads the response sent by a server' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -188,10 +178,9 @@ describe GRPC::ActiveCall do
it 'saves no metadata when the server adds no metadata' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -203,10 +192,9 @@ describe GRPC::ActiveCall do
it 'saves metadata add by the server' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
@@ -219,10 +207,9 @@ describe GRPC::ActiveCall do
it 'get a nil msg before a status when an OK status is sent' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
client_call.writes_done(false)
@@ -236,11 +223,10 @@ describe GRPC::ActiveCall do
it 'unmarshals the response using the unmarshal func' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
+ ActiveCall.client_invoke(call)
unmarshal = proc { |x| 'unmarshalled:' + x }
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- unmarshal, deadline,
- metadata_tag: md_tag)
+ client_call = ActiveCall.new(call, @pass_through,
+ unmarshal, deadline)
# confirm the client receives the unmarshalled message
msg = 'message is a string'
@@ -254,17 +240,16 @@ describe GRPC::ActiveCall do
describe '#each_remote_read' do
it 'creates an Enumerator' do
call = make_test_call
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
+ client_call = ActiveCall.new(call, @pass_through,
@pass_through, deadline)
expect(client_call.each_remote_read).to be_a(Enumerator)
end
it 'the returns an enumerator that can read n responses' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
@@ -279,10 +264,9 @@ describe GRPC::ActiveCall do
it 'the returns an enumerator that stops after an OK Status' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
reply = 'server_response'
client_call.remote_send(msg)
@@ -302,10 +286,9 @@ describe GRPC::ActiveCall do
describe '#writes_done' do
it 'finishes ok if the server sends a status response' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
expect { client_call.writes_done(false) }.to_not raise_error
@@ -318,10 +301,9 @@ describe GRPC::ActiveCall do
it 'finishes ok if the server sends an early status response' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -334,10 +316,9 @@ describe GRPC::ActiveCall do
it 'finishes ok if writes_done is true' do
call = make_test_call
- md_tag = ActiveCall.client_invoke(call, @client_queue)
- client_call = ActiveCall.new(call, @client_queue, @pass_through,
- @pass_through, deadline,
- metadata_tag: md_tag)
+ ActiveCall.client_invoke(call)
+ client_call = ActiveCall.new(call, @pass_through,
+ @pass_through, deadline)
msg = 'message is a string'
client_call.remote_send(msg)
server_call = expect_server_to_receive(msg)
@@ -355,17 +336,16 @@ describe GRPC::ActiveCall do
end
def expect_server_to_be_invoked(**kw)
- recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_rpc = @server.request_call
expect(recvd_rpc).to_not eq nil
recvd_call = recvd_rpc.call
- recvd_call.run_batch(@server_queue, @server_tag, deadline,
- CallOps::SEND_INITIAL_METADATA => kw)
- ActiveCall.new(recvd_call, @server_queue, @pass_through,
- @pass_through, deadline)
+ recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw)
+ ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline,
+ metadata_received: true, started: true)
end
def make_test_call
- @ch.create_call(@client_queue, nil, nil, '/method', nil, deadline)
+ @ch.create_call(nil, nil, '/method', nil, deadline)
end
def deadline
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 168e7fb791..6034b5419c 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -29,11 +29,14 @@
require 'grpc'
+Thread.abort_on_exception = true
+
def wakey_thread(&blk)
n = GRPC::Notifier.new
t = Thread.new do
blk.call(n)
end
+ t.abort_on_exception = true
n.wait
t
end
@@ -54,15 +57,13 @@ describe 'ClientStub' do
before(:each) do
Thread.abort_on_exception = true
@server = nil
- @server_queue = nil
@method = 'an_rpc_method'
@pass = OK
@fail = INTERNAL
- @cq = GRPC::Core::CompletionQueue.new
end
after(:each) do
- @server.close(@server_queue) unless @server_queue.nil?
+ @server.close(from_relative_time(2)) unless @server.nil?
end
describe '#new' do
@@ -70,7 +71,7 @@ describe 'ClientStub' do
it 'can be created from a host and args' do
opts = { channel_args: { a_channel_arg: 'an_arg' } }
blk = proc do
- GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
+ GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
end
expect(&blk).not_to raise_error
end
@@ -81,7 +82,7 @@ describe 'ClientStub' do
channel_override: @ch
}
blk = proc do
- GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
+ GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
end
expect(&blk).not_to raise_error
end
@@ -92,7 +93,7 @@ describe 'ClientStub' do
channel_args: { a_channel_arg: 'an_arg' },
channel_override: Object.new
}
- GRPC::ClientStub.new(fake_host, @cq, :this_channel_is_insecure, **opts)
+ GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
end
expect(&blk).to raise_error
end
@@ -100,7 +101,7 @@ describe 'ClientStub' do
it 'cannot be created with bad credentials' do
blk = proc do
opts = { channel_args: { a_channel_arg: 'an_arg' } }
- GRPC::ClientStub.new(fake_host, @cq, Object.new, **opts)
+ GRPC::ClientStub.new(fake_host, Object.new, **opts)
end
expect(&blk).to raise_error
end
@@ -115,7 +116,7 @@ describe 'ClientStub' do
}
}
creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
- GRPC::ClientStub.new(fake_host, @cq, creds, **opts)
+ GRPC::ClientStub.new(fake_host, creds, **opts)
end
expect(&blk).to_not raise_error
end
@@ -130,7 +131,7 @@ describe 'ClientStub' do
it 'should send a request to/receive a reply from a server' do
server_port = create_test_server
th = run_request_response(@sent_msg, @resp, @pass)
- stub = GRPC::ClientStub.new("localhost:#{server_port}", @cq,
+ stub = GRPC::ClientStub.new("localhost:#{server_port}",
:this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp)
th.join
@@ -141,7 +142,7 @@ describe 'ClientStub' do
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass,
k1: 'v1', k2: 'v2')
- stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp)
th.join
end
@@ -151,7 +152,7 @@ describe 'ClientStub' do
alt_host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass)
ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)
- stub = GRPC::ClientStub.new('ignored-host', @cq,
+ stub = GRPC::ClientStub.new('ignored-host',
:this_channel_is_insecure,
channel_override: ch)
expect(get_response(stub)).to eq(@resp)
@@ -162,7 +163,7 @@ describe 'ClientStub' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @fail)
- stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
blk = proc { get_response(stub) }
expect(&blk).to raise_error(GRPC::BadStatus)
th.join
@@ -182,7 +183,8 @@ describe 'ClientStub' do
def get_response(stub)
op = stub.request_response(@method, @sent_msg, noop, noop,
return_op: true,
- metadata: { k1: 'v1', k2: 'v2' })
+ metadata: { k1: 'v1', k2: 'v2' },
+ deadline: from_relative_time(2))
expect(op).to be_a(GRPC::ActiveCall::Operation)
op.execute
end
@@ -196,7 +198,7 @@ describe 'ClientStub' do
before(:each) do
server_port = create_test_server
host = "localhost:#{server_port}"
- @stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+ @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
@metadata = { k1: 'v1', k2: 'v2' }
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@resp = 'a_reply'
@@ -262,7 +264,7 @@ describe 'ClientStub' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @pass)
- stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_responses(stub).collect { |r| r }).to eq(@replys)
th.join
end
@@ -271,7 +273,7 @@ describe 'ClientStub' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail)
- stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
@@ -282,7 +284,7 @@ describe 'ClientStub' do
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail,
k1: 'v1', k2: 'v2')
- stub = GRPC::ClientStub.new(host, @cq, :this_channel_is_insecure)
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
@@ -327,7 +329,7 @@ describe 'ClientStub' do
it 'supports sending all the requests first', bidi: true do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
- stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@replys)
th.join
@@ -335,7 +337,7 @@ describe 'ClientStub' do
it 'supports client-initiated ping pong', bidi: true do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
- stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
@@ -343,7 +345,7 @@ describe 'ClientStub' do
it 'supports a server-initiated ping pong', bidi: true do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
- stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
@@ -372,26 +374,6 @@ describe 'ClientStub' do
it_behaves_like 'bidi streaming'
end
-
- describe 'without enough time to run' do
- before(:each) do
- @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
- @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
- server_port = create_test_server
- @host = "localhost:#{server_port}"
- end
-
- it 'should fail with DeadlineExceeded', bidi: true do
- @server.start
- stub = GRPC::ClientStub.new(@host, @cq, :this_channel_is_insecure)
- blk = proc do
- e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
- deadline: from_relative_time(0.001))
- e.collect { |r| r }
- end
- expect(&blk).to raise_error GRPC::BadStatus, /Deadline Exceeded/
- end
- end
end
def run_server_streamer(expected_input, replys, status, **kw)
@@ -460,21 +442,18 @@ describe 'ClientStub' do
end
def create_test_server
- @server_queue = GRPC::Core::CompletionQueue.new
- @server = GRPC::Core::Server.new(@server_queue, nil)
+ @server = GRPC::Core::Server.new(nil)
@server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
end
def expect_server_to_be_invoked(notifier)
@server.start
notifier.notify(nil)
- server_tag = Object.new
- recvd_rpc = @server.request_call(@server_queue, server_tag,
- INFINITE_FUTURE)
+ recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
recvd_call.metadata = recvd_rpc.metadata
- recvd_call.run_batch(@server_queue, server_tag, Time.now + 2,
- SEND_INITIAL_METADATA => nil)
- GRPC::ActiveCall.new(recvd_call, @server_queue, noop, noop, INFINITE_FUTURE)
+ recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
+ GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
+ metadata_received: true)
end
end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 943502cea2..31157cf161 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -95,7 +95,7 @@ class FailingService
def initialize(_default_var = 'ignored')
@details = 'app error'
@code = 101
- @md = { failed_method: 'an_rpc' }
+ @md = { 'failed_method' => 'an_rpc' }
end
def an_rpc(_req, _call)
@@ -135,8 +135,6 @@ describe GRPC::RpcServer do
@pass = 0
@fail = 1
@noop = proc { |x| x }
-
- @server_queue = GRPC::Core::CompletionQueue.new
end
describe '#new' do
@@ -148,28 +146,6 @@ describe GRPC::RpcServer do
expect(&blk).not_to raise_error
end
- it 'can be created with a completion queue override' do
- opts = {
- server_args: { a_channel_arg: 'an_arg' },
- completion_queue_override: @server_queue
- }
- blk = proc do
- RpcServer.new(**opts)
- end
- expect(&blk).not_to raise_error
- end
-
- it 'cannot be created with a bad completion queue override' do
- blk = proc do
- opts = {
- server_args: { a_channel_arg: 'an_arg' },
- completion_queue_override: Object.new
- }
- RpcServer.new(**opts)
- end
- expect(&blk).to raise_error
- end
-
it 'cannot be created with invalid ServerCredentials' do
blk = proc do
opts = {
@@ -294,7 +270,6 @@ describe GRPC::RpcServer do
context 'with no connect_metadata' do
before(:each) do
server_opts = {
- completion_queue_override: @server_queue,
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
@@ -309,8 +284,7 @@ describe GRPC::RpcServer do
@srv.wait_till_running
req = EchoMsg.new
blk = proc do
- cq = GRPC::Core::CompletionQueue.new
- stub = GRPC::ClientStub.new(@host, cq, :this_channel_is_insecure,
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
**client_opts)
stub.request_response('/unknown', req, marshal, unmarshal)
end
@@ -325,8 +299,7 @@ describe GRPC::RpcServer do
@srv.wait_till_running
req = EchoMsg.new
blk = proc do
- cq = GRPC::Core::CompletionQueue.new
- stub = GRPC::ClientStub.new(@host, cq, :this_channel_is_insecure,
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
**client_opts)
stub.request_response('/an_rpc', req, marshal, unmarshal)
end
@@ -422,7 +395,6 @@ describe GRPC::RpcServer do
it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do
opts = {
server_args: { a_channel_arg: 'an_arg' },
- completion_queue_override: @server_queue,
pool_size: 1,
poll_period: 1,
max_waiting_requests: 0
@@ -466,7 +438,6 @@ describe GRPC::RpcServer do
end
before(:each) do
server_opts = {
- completion_queue_override: @server_queue,
poll_period: 1,
connect_md_proc: test_md_proc
}
@@ -502,7 +473,6 @@ describe GRPC::RpcServer do
context 'with trailing metadata' do
before(:each) do
server_opts = {
- completion_queue_override: @server_queue,
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
@@ -545,7 +515,7 @@ describe GRPC::RpcServer do
op = stub.an_rpc(req, return_op: true, metadata: { k1: 'v1', k2: 'v2' })
expect(op.metadata).to be nil
expect(op.execute).to be_a(EchoMsg)
- expect(op.metadata).to eq(wanted_trailers)
+ expect(op.trailing_metadata).to eq(wanted_trailers)
@srv.stop
t.join
end
diff --git a/src/ruby/spec/pb/health/checker_spec.rb b/src/ruby/spec/pb/health/checker_spec.rb
index f3d121a31e..de11c9fedf 100644
--- a/src/ruby/spec/pb/health/checker_spec.rb
+++ b/src/ruby/spec/pb/health/checker_spec.rb
@@ -168,11 +168,9 @@ describe Grpc::Health::Checker do
CheckerStub = Grpc::Health::Checker.rpc_stub_class
before(:each) do
- @server_queue = GRPC::Core::CompletionQueue.new
server_host = '0.0.0.0:0'
@client_opts = { channel_override: @ch }
server_opts = {
- completion_queue_override: @server_queue,
poll_period: 1
}
@srv = RpcServer.new(**server_opts)
diff --git a/src/ruby/spec/server_spec.rb b/src/ruby/spec/server_spec.rb
index 439b19fb8d..003d8f69d5 100644
--- a/src/ruby/spec/server_spec.rb
+++ b/src/ruby/spec/server_spec.rb
@@ -43,19 +43,15 @@ describe Server do
GRPC::Core::ServerCredentials.new(*load_test_certs)
end
- before(:each) do
- @cq = GRPC::Core::CompletionQueue.new
- end
-
describe '#start' do
it 'runs without failing' do
- blk = proc { Server.new(@cq, nil).start }
+ blk = proc { Server.new(nil).start }
expect(&blk).to_not raise_error
end
it 'fails if the server is closed' do
- s = Server.new(@cq, nil)
- s.close(@cq)
+ s = Server.new(nil)
+ s.close
expect { s.start }.to raise_error(RuntimeError)
end
end
@@ -63,19 +59,19 @@ describe Server do
describe '#destroy' do
it 'destroys a server ok' do
s = start_a_server
- blk = proc { s.destroy(@cq) }
+ blk = proc { s.destroy }
expect(&blk).to_not raise_error
end
it 'can be called more than once without error' do
s = start_a_server
begin
- blk = proc { s.destroy(@cq) }
+ blk = proc { s.destroy }
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
ensure
- s.close(@cq)
+ s.close
end
end
end
@@ -84,7 +80,7 @@ describe Server do
it 'closes a server ok' do
s = start_a_server
begin
- blk = proc { s.close(@cq) }
+ blk = proc { s.close }
expect(&blk).to_not raise_error
ensure
s.close(@cq)
@@ -93,7 +89,7 @@ describe Server do
it 'can be called more than once without error' do
s = start_a_server
- blk = proc { s.close(@cq) }
+ blk = proc { s.close }
expect(&blk).to_not raise_error
blk.call
expect(&blk).to_not raise_error
@@ -104,16 +100,16 @@ describe Server do
describe 'for insecure servers' do
it 'runs without failing' do
blk = proc do
- s = Server.new(@cq, nil)
+ s = Server.new(nil)
s.add_http2_port('localhost:0', :this_port_is_insecure)
- s.close(@cq)
+ s.close
end
expect(&blk).to_not raise_error
end
it 'fails if the server is closed' do
- s = Server.new(@cq, nil)
- s.close(@cq)
+ s = Server.new(nil)
+ s.close
blk = proc do
s.add_http2_port('localhost:0', :this_port_is_insecure)
end
@@ -125,16 +121,16 @@ describe Server do
let(:cert) { create_test_cert }
it 'runs without failing' do
blk = proc do
- s = Server.new(@cq, nil)
+ s = Server.new(nil)
s.add_http2_port('localhost:0', cert)
- s.close(@cq)
+ s.close
end
expect(&blk).to_not raise_error
end
it 'fails if the server is closed' do
- s = Server.new(@cq, nil)
- s.close(@cq)
+ s = Server.new(nil)
+ s.close
blk = proc { s.add_http2_port('localhost:0', cert) }
expect(&blk).to raise_error(RuntimeError)
end
@@ -142,8 +138,8 @@ describe Server do
end
shared_examples '#new' do
- it 'takes a completion queue with nil channel args' do
- expect { Server.new(@cq, nil) }.to_not raise_error
+ it 'takes nil channel args' do
+ expect { Server.new(nil) }.to_not raise_error
end
it 'does not take a hash with bad keys as channel args' do
@@ -194,14 +190,14 @@ describe Server do
describe '#new with an insecure channel' do
def construct_with_args(a)
- proc { Server.new(@cq, a) }
+ proc { Server.new(a) }
end
it_behaves_like '#new'
end
def start_a_server
- s = Server.new(@cq, nil)
+ s = Server.new(nil)
s.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
s.start
s
diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb
index dca7fd7e72..e457ec09dd 100644
--- a/src/ruby/tools/version.rb
+++ b/src/ruby/tools/version.rb
@@ -29,6 +29,6 @@
module GRPC
module Tools
- VERSION = '0.15.0.dev'
+ VERSION = '1.1.0.dev'
end
end