aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Tim Emiola <temiola@google.com>2015-08-20 13:12:33 -0700
committerGravatar Tim Emiola <temiola@google.com>2015-08-20 13:12:33 -0700
commit7840a5573678880ce71398fdab5948128992bfc7 (patch)
tree3a1c2c5c21b43323b9c9ffb6087c7d0dfb2ea8f7 /src
parente710e2bb999bcbfb6fd9a9866d1380d74840077b (diff)
Adds support for per message compression
Diffstat (limited to 'src')
-rw-r--r--src/ruby/ext/grpc/rb_call.c57
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb5
-rw-r--r--src/ruby/spec/call_spec.rb8
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb28
4 files changed, 92 insertions, 6 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 36c6818a7e..6b5beb6f5d 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -82,6 +82,10 @@ static ID id_metadata;
* received by the call and subsequently saved on it. */
static ID id_status;
+/* id_write_flag is name of the attribute used to access the write_flag
+ * saved on the call. */
+static ID id_write_flag;
+
/* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
static VALUE sym_send_message;
static VALUE sym_send_metadata;
@@ -240,6 +244,30 @@ static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
return rb_ivar_set(self, id_metadata, metadata);
}
+/*
+ call-seq:
+ write_flag = call.write_flag
+
+ Gets the write_flag value saved the call. */
+static VALUE grpc_rb_call_get_write_flag(VALUE self) {
+ return rb_ivar_get(self, id_write_flag);
+}
+
+/*
+ call-seq:
+ call.write_flag = write_flag
+
+ Saves the write_flag on the call. */
+static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
+ if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
+ rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
+ rb_obj_classname(write_flag));
+ return Qnil;
+ }
+
+ return rb_ivar_set(self, id_write_flag, write_flag);
+}
+
/* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
to fill grpc_metadata_array.
@@ -437,17 +465,19 @@ typedef struct run_batch_stack {
grpc_status_code recv_status;
char *recv_status_details;
size_t recv_status_details_capacity;
+ uint write_flag;
} run_batch_stack;
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
-static void grpc_run_batch_stack_init(run_batch_stack *st) {
+static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
grpc_metadata_array_init(&st->recv_metadata);
grpc_metadata_array_init(&st->recv_trailing_metadata);
st->op_num = 0;
+ st->write_flag = write_flag;
}
/* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
@@ -477,6 +507,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
this_op = rb_ary_entry(ops_ary, i);
this_value = rb_hash_aref(ops_hash, this_op);
+ st->ops[st->op_num].flags = 0;
switch (NUM2INT(this_op)) {
case GRPC_OP_SEND_INITIAL_METADATA:
/* N.B. later there is no need to explicitly delete the metadata keys
@@ -490,6 +521,7 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
case GRPC_OP_SEND_MESSAGE:
st->ops[st->op_num].data.send_message = grpc_rb_s_to_byte_buffer(
RSTRING_PTR(this_value), RSTRING_LEN(this_value));
+ st->ops[st->op_num].flags = st->write_flag;
break;
case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
break;
@@ -525,7 +557,6 @@ static void grpc_run_batch_stack_fill_ops(run_batch_stack *st, VALUE ops_hash) {
NUM2INT(this_op));
};
st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
- st->ops[st->op_num].flags = 0;
st->ops[st->op_num].reserved = NULL;
st->op_num++;
}
@@ -604,6 +635,8 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_event ev;
grpc_call_error err;
VALUE result = Qnil;
+ VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
+ uint write_flag = 0;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
/* Validate the ops args, adding them to a ruby array */
@@ -611,7 +644,10 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
return Qnil;
}
- grpc_run_batch_stack_init(&st);
+ if (rb_write_flag != Qnil) {
+ write_flag = NUM2UINT(rb_write_flag);
+ }
+ grpc_run_batch_stack_init(&st, write_flag);
grpc_run_batch_stack_fill_ops(&st, ops_hash);
/* call grpc_call_start_batch, then wait for it to complete using
@@ -638,6 +674,16 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
return result;
}
+static void Init_grpc_write_flags() {
+ /* Constants representing the write flags in grpc.h */
+ VALUE grpc_rb_mWriteFlags =
+ rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
+ rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
+ UINT2NUM(GRPC_WRITE_BUFFER_HINT));
+ rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
+ UINT2NUM(GRPC_WRITE_NO_COMPRESS));
+}
+
static void Init_grpc_error_codes() {
/* Constants representing the error codes of grpc_call_error in grpc.h */
VALUE grpc_rb_mRpcErrors =
@@ -735,10 +781,14 @@ void Init_grpc_call() {
rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
+ rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
+ rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
+ 1);
/* Ids used to support call attributes */
id_metadata = rb_intern("metadata");
id_status = rb_intern("status");
+ id_write_flag = rb_intern("write_flag");
/* Ids used by the c wrapping internals. */
id_cq = rb_intern("__cq");
@@ -766,6 +816,7 @@ void Init_grpc_call() {
Init_grpc_error_codes();
Init_grpc_op_codes();
+ Init_grpc_write_flags();
}
/* Gets the call from the ruby object */
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 17da401c6b..d9cb924735 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -59,7 +59,7 @@ module GRPC
include Core::CallOps
extend Forwardable
attr_reader(:deadline)
- def_delegators :@call, :cancel, :metadata
+ def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=
# client_invoke begins a client invocation.
#
@@ -484,6 +484,7 @@ module GRPC
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled, :deadline, :execute,
- :metadata, :status, :start_call, :wait)
+ :metadata, :status, :start_call, :wait, :write_flag,
+ :write_flag=)
end
end
diff --git a/src/ruby/spec/call_spec.rb b/src/ruby/spec/call_spec.rb
index 3c5d33ffcd..dd3c45f754 100644
--- a/src/ruby/spec/call_spec.rb
+++ b/src/ruby/spec/call_spec.rb
@@ -31,6 +31,14 @@ require 'grpc'
include GRPC::Core::StatusCodes
+describe GRPC::Core::WriteFlags do
+ it 'should define the known write flag values' do
+ m = GRPC::Core::WriteFlags
+ expect(m.const_get(:BUFFER_HINT)).to_not be_nil
+ expect(m.const_get(:NO_COMPRESS)).to_not be_nil
+ end
+end
+
describe GRPC::Core::RpcErrors do
before(:each) do
@known_types = {
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 26208b714a..fcd7bd082f 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -35,6 +35,7 @@ describe GRPC::ActiveCall do
ActiveCall = GRPC::ActiveCall
Call = GRPC::Core::Call
CallOps = GRPC::Core::CallOps
+ WriteFlags = GRPC::Core::WriteFlags
before(:each) do
@pass_through = proc { |x| x }
@@ -129,6 +130,31 @@ describe GRPC::ActiveCall do
@pass_through, deadline)
expect(server_call.remote_read).to eq('marshalled:' + msg)
end
+
+ TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS]
+ TEST_WRITE_FLAGS.each do |f|
+ it "successfully makes calls with write_flag set to #{f}" do
+ call = make_test_call
+ ActiveCall.client_invoke(call, @client_queue)
+ marshal = proc { |x| 'marshalled:' + x }
+ client_call = ActiveCall.new(call, @client_queue, marshal,
+ @pass_through, deadline)
+ msg = 'message is a string'
+ client_call.write_flag = f
+ client_call.remote_send(msg)
+
+ # confirm that the message was marshalled
+ recvd_rpc = @server.request_call(@server_queue, @server_tag, deadline)
+ recvd_call = recvd_rpc.call
+ server_ops = {
+ CallOps::SEND_INITIAL_METADATA => nil
+ }
+ recvd_call.run_batch(@server_queue, @server_tag, deadline, server_ops)
+ server_call = ActiveCall.new(recvd_call, @server_queue, @pass_through,
+ @pass_through, deadline)
+ expect(server_call.remote_read).to eq('marshalled:' + msg)
+ end
+ end
end
describe '#client_invoke' do
@@ -261,7 +287,7 @@ describe GRPC::ActiveCall do
client_call.writes_done(false)
server_call = expect_server_to_receive(msg)
e = client_call.each_remote_read
- n = 3 # arbitrary value > 1
+ n = 3 # arbitrary value > 1
n.times do
server_call.remote_send(reply)
expect(e.next).to eq(reply)