From 48b36b5bbf1a2c8a9276742075241f9a1f67743c Mon Sep 17 00:00:00 2001 From: Tim Emiola Date: Sat, 28 Mar 2015 15:15:03 -0700 Subject: Updates Server#request_call in line with the new API --- src/ruby/ext/grpc/rb_server.c | 85 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 80 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index c54f02e87a..9a49285db2 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -46,6 +46,9 @@ /* rb_cServer is the ruby class that proxies grpc_server. */ VALUE rb_cServer = Qnil; +/* id_at is the constructor method of the ruby standard Time class. */ +static ID id_at; + /* grpc_rb_server wraps a grpc_server. It provides a peer ruby object, 'mark' to minimize copying when a server is created from ruby. */ typedef struct grpc_rb_server { @@ -152,18 +155,89 @@ static VALUE grpc_rb_server_init_copy(VALUE copy, VALUE orig) { return copy; } -static VALUE grpc_rb_server_request_call(VALUE self, VALUE tag_new) { - grpc_call_error err; +/* request_call_stack holds various values used by the + * grpc_rb_server_request_call function */ +typedef struct request_call_stack { + grpc_call_details details; + grpc_metadata_array md_ary; +} request_call_stack; + +/* grpc_request_call_stack_init ensures the request_call_stack is properly + * initialized */ +static void grpc_request_call_stack_init(request_call_stack* st) { + MEMZERO(st, request_call_stack, 1); + grpc_metadata_array_init(&st->md_ary); + grpc_call_details_init(&st->details); + st->details.method = NULL; + st->details.host = NULL; +} + +/* grpc_request_call_stack_cleanup ensures the request_call_stack is properly + * cleaned up */ +static void grpc_request_call_stack_cleanup(request_call_stack* st) { + grpc_metadata_array_destroy(&st->md_ary); + grpc_call_details_destroy(&st->details); +} + +/* call-seq: + cq = CompletionQueue.new + tag = Object.new + timeout = 10 + server.request_call(cqueue, tag, timeout) + + Requests notification of a new call on a server. */ +static VALUE grpc_rb_server_request_call(VALUE self, VALUE cqueue, + VALUE tag_new, VALUE timeout) { grpc_rb_server *s = NULL; + grpc_call *call = NULL; + grpc_event *ev = NULL; + grpc_call_error err; + request_call_stack st; + VALUE result; Data_Get_Struct(self, grpc_rb_server, s); if (s->wrapped == NULL) { rb_raise(rb_eRuntimeError, "closed!"); + return Qnil; } else { - err = grpc_server_request_call_old(s->wrapped, ROBJECT(tag_new)); + grpc_request_call_stack_init(&st); + /* call grpc_server_request_call, then wait for it to complete using + * pluck_event */ + err = grpc_server_request_call( + s->wrapped, &call, &st.details, &st.md_ary, + grpc_rb_get_wrapped_completion_queue(cqueue), + ROBJECT(tag_new)); if (err != GRPC_CALL_OK) { - rb_raise(rb_eCallError, "server request failed: %s (code=%d)", + grpc_request_call_stack_cleanup(&st); + rb_raise(rb_eCallError, "grpc_server_request_call failed: %s (code=%d)", grpc_call_error_detail_of(err), err); + return Qnil; } + ev = grpc_rb_completion_queue_pluck_event(cqueue, tag_new, timeout); + if (ev == NULL) { + grpc_request_call_stack_cleanup(&st); + return Qnil; + } + if (ev->data.op_complete != GRPC_OP_OK) { + grpc_request_call_stack_cleanup(&st); + grpc_event_finish(ev); + rb_raise(rb_eCallError, "request_call completion failed: (code=%d)", + ev->data.op_complete); + return Qnil; + } + + /* build the NewServerRpc struct result */ + result = rb_struct_new( + rb_sNewServerRpc, + rb_str_new2(st.details.method), + rb_str_new2(st.details.host), + rb_funcall(rb_cTime, id_at, 2, INT2NUM(st.details.deadline.tv_sec), + INT2NUM(st.details.deadline.tv_nsec)), + grpc_rb_md_ary_to_h(&st.md_ary), + grpc_rb_wrap_call(call), + NULL); + grpc_event_finish(ev); + grpc_request_call_stack_cleanup(&st); + return result; } return Qnil; } @@ -249,12 +323,13 @@ void Init_grpc_server() { rb_define_method(rb_cServer, "initialize_copy", grpc_rb_server_init_copy, 1); /* Add the server methods. */ - rb_define_method(rb_cServer, "request_call", grpc_rb_server_request_call, 1); + rb_define_method(rb_cServer, "request_call", grpc_rb_server_request_call, 3); rb_define_method(rb_cServer, "start", grpc_rb_server_start, 0); rb_define_method(rb_cServer, "destroy", grpc_rb_server_destroy, 0); rb_define_alias(rb_cServer, "close", "destroy"); rb_define_method(rb_cServer, "add_http2_port", grpc_rb_server_add_http2_port, -1); + id_at = rb_intern("at"); } /* Gets the wrapped server from the ruby wrapper */ -- cgit v1.2.3