diff options
author | murgatroid99 <mlumish@google.com> | 2016-06-06 15:37:45 -0700 |
---|---|---|
committer | murgatroid99 <mlumish@google.com> | 2016-06-06 15:37:45 -0700 |
commit | ec1588ba87d665cf629a8893d6070688a73ea7ef (patch) | |
tree | edae78ea7c15c266497b63b4146c66f138bf3422 /src/ruby/ext/grpc/rb_call.c | |
parent | cc0f4e1c807090c97aa64f83649013e0e6ab879b (diff) |
Ruby: Moved completion queue entirely into extension code
Diffstat (limited to 'src/ruby/ext/grpc/rb_call.c')
-rw-r--r-- | src/ruby/ext/grpc/rb_call.c | 100 |
1 files changed, 46 insertions, 54 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index b436057c16..f2c567c7da 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -63,23 +63,10 @@ static VALUE grpc_rb_sBatchResult; * grpc_metadata_array. */ static VALUE grpc_rb_cMdAry; -/* id_cq is the name of the hidden ivar that preserves a reference to a - * completion queue */ -static ID id_cq; - -/* id_flags is the name of the hidden ivar that preserves the value of - * the flags used to create metadata from a Hash */ -static ID id_flags; - /* id_credentials is the name of the hidden ivar that preserves the value * of the credentials added to the call */ static ID id_credentials; -/* id_input_md is the name of the hidden ivar that preserves the hash used to - * create metadata, so that references to the strings it contains last as long - * as the call the metadata is added to. */ -static ID id_input_md; - /* id_metadata is name of the attribute used to access the metadata hash * received by the call and subsequently saved on it. */ static ID id_metadata; @@ -101,14 +88,23 @@ static VALUE sym_message; static VALUE sym_status; static VALUE sym_cancelled; +typedef struct grpc_rb_call { + grpc_call *wrapped; + grpc_completion_queue *queue; +} grpc_rb_call; + +static void destroy_call(grpc_rb_call *call) { + call = (grpc_rb_call *)p; + grpc_call_destroy(call->wrapped); + grpc_rb_completion_queue_destroy(call->queue); +} + /* Destroys a Call. */ static void grpc_rb_call_destroy(void *p) { - grpc_call* call = NULL; if (p == NULL) { return; } - call = (grpc_call *)p; - grpc_call_destroy(call); + destroy_call((grpc_rb_call*)p); } static size_t md_ary_datasize(const void *p) { @@ -167,15 +163,15 @@ const char *grpc_call_error_detail_of(grpc_call_error err) { /* Called by clients to cancel an RPC on the server. Can be called multiple times, from any thread. */ static VALUE grpc_rb_call_cancel(VALUE self) { - grpc_call *call = NULL; + grpc_rb_call *call = NULL; grpc_call_error err; if (RTYPEDDATA_DATA(self) == NULL) { //This call has been closed return Qnil; } - TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); - err = grpc_call_cancel(call, NULL); + TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call); + err = grpc_call_cancel(call->wrapped, NULL); if (err != GRPC_CALL_OK) { rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)", grpc_call_error_detail_of(err), err); @@ -189,10 +185,10 @@ static VALUE grpc_rb_call_cancel(VALUE self) { processed. */ static VALUE grpc_rb_call_close(VALUE self) { - grpc_call *call = NULL; - TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); + grpc_rb_call *call = NULL; + TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call); if(call != NULL) { - grpc_call_destroy(call); + destroy_call(call); RTYPEDDATA_DATA(self) = NULL; } return Qnil; @@ -201,14 +197,14 @@ static VALUE grpc_rb_call_close(VALUE self) { /* Called to obtain the peer that this call is connected to. */ static VALUE grpc_rb_call_get_peer(VALUE self) { VALUE res = Qnil; - grpc_call *call = NULL; + grpc_rb_call *call = NULL; char *peer = NULL; if (RTYPEDDATA_DATA(self) == NULL) { rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call"); return Qnil; } - TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); - peer = grpc_call_get_peer(call); + TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call); + peer = grpc_call_get_peer(call->wrapped); res = rb_str_new2(peer); gpr_free(peer); @@ -217,16 +213,16 @@ static VALUE grpc_rb_call_get_peer(VALUE self) { /* Called to obtain the x509 cert of an authenticated peer. */ static VALUE grpc_rb_call_get_peer_cert(VALUE self) { - grpc_call *call = NULL; + grpc_rb_call *call = NULL; VALUE res = Qnil; grpc_auth_context *ctx = NULL; if (RTYPEDDATA_DATA(self) == NULL) { rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call"); return Qnil; } - TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); + TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call); - ctx = grpc_call_auth_context(call); + ctx = grpc_call_auth_context(call->wrapped); if (!ctx || !grpc_auth_context_peer_is_authenticated(ctx)) { return Qnil; @@ -326,21 +322,23 @@ static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) { Sets credentials on a call */ static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) { - grpc_call *call = NULL; + grpc_rb_call *call = NULL; grpc_call_credentials *creds; grpc_call_error err; if (RTYPEDDATA_DATA(self) == NULL) { rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call"); return Qnil; } - TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); + TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call); creds = grpc_rb_get_wrapped_call_credentials(credentials); - err = grpc_call_set_credentials(call, creds); + err = grpc_call_set_credentials(call->wrapped, creds); if (err != GRPC_CALL_OK) { rb_raise(grpc_rb_eCallError, "grpc_call_set_credentials failed with %s (code=%d)", grpc_call_error_detail_of(err), err); } + /* We need the credentials to be alive for as long as the call is alive, + but we don't care about destruction order. */ rb_ivar_set(self, id_credentials, credentials); return Qnil; } @@ -733,7 +731,6 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { } /* call-seq: - cq = CompletionQueue.new ops = { GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>, GRPC::Core::CallOps::SEND_MESSAGE => <op_value>, @@ -741,7 +738,7 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { } tag = Object.new timeout = 10 - call.start_batch(cq, tag, timeout, ops) + call.start_batch(tag, timeout, ops) Start a batch of operations defined in the array ops; when complete, post a completion of type 'tag' to the completion queue bound to the call. @@ -750,20 +747,20 @@ static VALUE grpc_run_batch_stack_build_result(run_batch_stack *st) { The order of ops specified in the batch has no significance. Only one operation of each type can be active at once in any given batch */ -static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, - VALUE timeout, VALUE ops_hash) { +static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) { run_batch_stack st; - grpc_call *call = NULL; + grpc_rb_call *call = NULL; grpc_event ev; grpc_call_error err; VALUE result = Qnil; VALUE rb_write_flag = rb_ivar_get(self, id_write_flag); unsigned write_flag = 0; + void *tag = (void*)&st; if (RTYPEDDATA_DATA(self) == NULL) { rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call"); return Qnil; } - TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); + TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call); /* Validate the ops args, adding them to a ruby array */ if (TYPE(ops_hash) != T_HASH) { @@ -778,7 +775,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, /* call grpc_call_start_batch, then wait for it to complete using * pluck_event */ - err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag), NULL); + err = grpc_call_start_batch(call->wrapped, st.ops, st.op_num, tag, NULL); if (err != GRPC_CALL_OK) { grpc_run_batch_stack_cleanup(&st); rb_raise(grpc_rb_eCallError, @@ -786,12 +783,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, grpc_call_error_detail_of(err), err); return Qnil; } - ev = grpc_rb_completion_queue_pluck_event(cqueue, tag, timeout); - if (ev.type == GRPC_QUEUE_TIMEOUT) { - grpc_run_batch_stack_cleanup(&st); - rb_raise(grpc_rb_eOutOfTime, "grpc_call_start_batch timed out"); - return Qnil; - } + ev = grpc_rb_completion_queue_pluck(call->queue, tag, gpr_inf_future, NULL); /* Build and return the BatchResult struct result, if there is an error, it's reflected in the status */ @@ -900,7 +892,7 @@ void Init_grpc_call() { 1); /* Add ruby analogues of the Call methods. */ - rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 4); + rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1); rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0); rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0); rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0); @@ -921,9 +913,6 @@ void Init_grpc_call() { id_write_flag = rb_intern("write_flag"); /* Ids used by the c wrapping internals. */ - id_cq = rb_intern("__cq"); - id_flags = rb_intern("__flags"); - id_input_md = rb_intern("__input_md"); id_credentials = rb_intern("__credentials"); /* Ids used in constructing the batch result. */ @@ -947,15 +936,18 @@ void Init_grpc_call() { /* Gets the call from the ruby object */ grpc_call *grpc_rb_get_wrapped_call(VALUE v) { - grpc_call *c = NULL; - TypedData_Get_Struct(v, grpc_call, &grpc_call_data_type, c); - return c; + grpc_rb_call *call = NULL; + TypedData_Get_Struct(v, grpc_rb_call, &grpc_call_data_type, call); + return call->wrapped; } /* Obtains the wrapped object for a given call */ -VALUE grpc_rb_wrap_call(grpc_call *c) { - if (c == NULL) { +VALUE grpc_rb_wrap_call(grpc_call *c, grpc_completion_queue *q) { + if (c == NULL || q == NULL) { return Qnil; } - return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, c); + grpc_rb_call *wrapper = ALLOC(grpc_rb_call); + wrapper->wrapped = c; + wrapper->queue = q; + return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper); } |