aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar temiola <temiola@google.com>2014-12-18 10:58:22 -0800
committerGravatar Michael Lumish <mlumish@google.com>2014-12-19 13:16:21 -0800
commit21bb60cf4d4668c02c11fa68b694cdee11ab45ce (patch)
tree51c630206c095a7cd4425fdc84809fd49315eae3 /src
parentda029e39b6cef1c21982312a5f4943dce1a20c80 (diff)
Exposes event#finish as #close.
- ensures that it's a runtime error if an event if used after it's finished - updates all calls where the completion_queue is used to ensure the event's retrieved are explicitly finished Change on 2014/12/18 by temiola <temiola@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=82445748
Diffstat (limited to 'src')
-rw-r--r--src/ruby/ext/grpc/rb_call.c1
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c6
-rw-r--r--src/ruby/ext/grpc/rb_event.c123
-rw-r--r--src/ruby/ext/grpc/rb_event.h11
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c11
-rw-r--r--src/ruby/ext/grpc/rb_grpc.h6
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb72
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb46
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb9
9 files changed, 205 insertions, 80 deletions
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 872f8e35ea..bf292fac75 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -38,7 +38,6 @@
#include <grpc/grpc.h>
#include "rb_byte_buffer.h"
#include "rb_completion_queue.h"
-#include "rb_event.h"
#include "rb_metadata.h"
#include "rb_grpc.h"
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index dfde44218b..dc95838ef5 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -141,8 +141,7 @@ static VALUE grpc_rb_completion_queue_next(VALUE self, VALUE timeout) {
if (next_call.event == NULL) {
return Qnil;
}
- return Data_Wrap_Struct(rb_cEvent, GC_NOT_MARKED, grpc_rb_event_finish,
- next_call.event);
+ return grpc_rb_new_event(next_call.event);
}
/* Blocks until the next event for given tag is available, and returns the
@@ -160,8 +159,7 @@ static VALUE grpc_rb_completion_queue_pluck(VALUE self, VALUE tag,
if (next_call.event == NULL) {
return Qnil;
}
- return Data_Wrap_Struct(rb_cEvent, GC_NOT_MARKED, grpc_rb_event_finish,
- next_call.event);
+ return grpc_rb_new_event(next_call.event);
}
/* rb_cCompletionQueue is the ruby class that proxies grpc_completion_queue. */
diff --git a/src/ruby/ext/grpc/rb_event.c b/src/ruby/ext/grpc/rb_event.c
index 76ea6ad103..9200f923cc 100644
--- a/src/ruby/ext/grpc/rb_event.c
+++ b/src/ruby/ext/grpc/rb_event.c
@@ -41,12 +41,49 @@
#include "rb_call.h"
#include "rb_metadata.h"
+/* grpc_rb_event wraps a grpc_event. It provides a peer ruby object,
+ * 'mark' to minimize copying when an event is created from ruby. */
+typedef struct grpc_rb_event {
+ /* Holder of ruby objects involved in constructing the channel */
+ VALUE mark;
+ /* The actual event */
+ grpc_event *wrapped;
+} grpc_rb_event;
+
+
/* rb_mCompletionType is a ruby module that holds the completion type values */
VALUE rb_mCompletionType = Qnil;
-/* Helper function to free an event. */
-void grpc_rb_event_finish(void *p) {
- grpc_event_finish(p);
+/* Destroys Event instances. */
+static void grpc_rb_event_free(void *p) {
+ grpc_rb_event *ev = NULL;
+ if (p == NULL) {
+ return;
+ };
+ ev = (grpc_rb_event *)p;
+
+ /* Deletes the wrapped object if the mark object is Qnil, which indicates
+ * that no other object is the actual owner. */
+ if (ev->wrapped != NULL && ev->mark == Qnil) {
+ grpc_event_finish(ev->wrapped);
+ rb_warning("event gc: destroyed the c event");
+ } else {
+ rb_warning("event gc: did not destroy the c event");
+ }
+
+ xfree(p);
+}
+
+/* Protects the mark object from GC */
+static void grpc_rb_event_mark(void *p) {
+ grpc_rb_event *event = NULL;
+ if (p == NULL) {
+ return;
+ }
+ event = (grpc_rb_event *)p;
+ if (event->mark != Qnil) {
+ rb_gc_mark(event->mark);
+ }
}
static VALUE grpc_rb_event_result(VALUE self);
@@ -54,7 +91,14 @@ static VALUE grpc_rb_event_result(VALUE self);
/* Obtains the type of an event. */
static VALUE grpc_rb_event_type(VALUE self) {
grpc_event *event = NULL;
- Data_Get_Struct(self, grpc_event, event);
+ grpc_rb_event *wrapper = NULL;
+ Data_Get_Struct(self, grpc_rb_event, wrapper);
+ if (wrapper->wrapped == NULL) {
+ rb_raise(rb_eRuntimeError, "finished!");
+ return Qnil;
+ }
+
+ event = wrapper->wrapped;
switch (event->type) {
case GRPC_QUEUE_SHUTDOWN:
return rb_const_get(rb_mCompletionType, rb_intern("QUEUE_SHUTDOWN"));
@@ -94,7 +138,14 @@ static VALUE grpc_rb_event_type(VALUE self) {
/* Obtains the tag associated with an event. */
static VALUE grpc_rb_event_tag(VALUE self) {
grpc_event *event = NULL;
- Data_Get_Struct(self, grpc_event, event);
+ grpc_rb_event *wrapper = NULL;
+ Data_Get_Struct(self, grpc_rb_event, wrapper);
+ if (wrapper->wrapped == NULL) {
+ rb_raise(rb_eRuntimeError, "finished!");
+ return Qnil;
+ }
+
+ event = wrapper->wrapped;
if (event->tag == NULL) {
return Qnil;
}
@@ -103,10 +154,17 @@ static VALUE grpc_rb_event_tag(VALUE self) {
/* Obtains the call associated with an event. */
static VALUE grpc_rb_event_call(VALUE self) {
- grpc_event *ev = NULL;
- Data_Get_Struct(self, grpc_event, ev);
- if (ev->call != NULL) {
- return grpc_rb_wrap_call(ev->call);
+ grpc_event *event = NULL;
+ grpc_rb_event *wrapper = NULL;
+ Data_Get_Struct(self, grpc_rb_event, wrapper);
+ if (wrapper->wrapped == NULL) {
+ rb_raise(rb_eRuntimeError, "finished!");
+ return Qnil;
+ }
+
+ event = wrapper->wrapped;
+ if (event->call != NULL) {
+ return grpc_rb_wrap_call(event->call);
}
return Qnil;
}
@@ -114,6 +172,7 @@ static VALUE grpc_rb_event_call(VALUE self) {
/* Obtains the metadata associated with an event. */
static VALUE grpc_rb_event_metadata(VALUE self) {
grpc_event *event = NULL;
+ grpc_rb_event *wrapper = NULL;
grpc_metadata *metadata = NULL;
VALUE key = Qnil;
VALUE new_ary = Qnil;
@@ -121,9 +180,14 @@ static VALUE grpc_rb_event_metadata(VALUE self) {
VALUE value = Qnil;
size_t count = 0;
size_t i = 0;
+ Data_Get_Struct(self, grpc_rb_event, wrapper);
+ if (wrapper->wrapped == NULL) {
+ rb_raise(rb_eRuntimeError, "finished!");
+ return Qnil;
+ }
/* Figure out which metadata to read. */
- Data_Get_Struct(self, grpc_event, event);
+ event = wrapper->wrapped;
switch (event->type) {
case GRPC_CLIENT_METADATA_READ:
@@ -179,7 +243,13 @@ static VALUE grpc_rb_event_metadata(VALUE self) {
/* Obtains the data associated with an event. */
static VALUE grpc_rb_event_result(VALUE self) {
grpc_event *event = NULL;
- Data_Get_Struct(self, grpc_event, event);
+ grpc_rb_event *wrapper = NULL;
+ Data_Get_Struct(self, grpc_rb_event, wrapper);
+ if (wrapper->wrapped == NULL) {
+ rb_raise(rb_eRuntimeError, "finished!");
+ return Qnil;
+ }
+ event = wrapper->wrapped;
switch (event->type) {
@@ -245,11 +315,19 @@ static VALUE grpc_rb_event_result(VALUE self) {
return Qfalse;
}
-/* rb_sNewServerRpc is the struct that holds new server rpc details. */
-VALUE rb_sNewServerRpc = Qnil;
-
-/* rb_sStatus is the struct that holds status details. */
-VALUE rb_sStatus = Qnil;
+static VALUE grpc_rb_event_finish(VALUE self) {
+ grpc_event *event = NULL;
+ grpc_rb_event *wrapper = NULL;
+ Data_Get_Struct(self, grpc_rb_event, wrapper);
+ if (wrapper->wrapped == NULL) { /* already closed */
+ return Qnil;
+ }
+ event = wrapper->wrapped;
+ grpc_event_finish(event);
+ wrapper->wrapped = NULL;
+ wrapper->mark = Qnil;
+ return Qnil;
+}
/* rb_cEvent is the Event class whose instances proxy grpc_event */
VALUE rb_cEvent = Qnil;
@@ -262,9 +340,6 @@ void Init_google_rpc_event() {
rb_eEventError = rb_define_class_under(rb_mGoogleRpcCore, "EventError",
rb_eStandardError);
rb_cEvent = rb_define_class_under(rb_mGoogleRpcCore, "Event", rb_cObject);
- rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host",
- "deadline", "metadata", NULL);
- rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL);
/* Prevent allocation or inialization from ruby. */
rb_define_alloc_func(rb_cEvent, grpc_rb_cannot_alloc);
@@ -276,6 +351,8 @@ void Init_google_rpc_event() {
rb_define_method(rb_cEvent, "result", grpc_rb_event_result, 0);
rb_define_method(rb_cEvent, "tag", grpc_rb_event_tag, 0);
rb_define_method(rb_cEvent, "type", grpc_rb_event_type, 0);
+ rb_define_method(rb_cEvent, "finish", grpc_rb_event_finish, 0);
+ rb_define_alias(rb_cEvent, "close", "finish");
/* Constants representing the completion types */
rb_mCompletionType = rb_define_module_under(rb_mGoogleRpcCore,
@@ -298,3 +375,11 @@ void Init_google_rpc_event() {
rb_define_const(rb_mCompletionType, "RESERVED",
INT2NUM(GRPC_COMPLETION_DO_NOT_USE));
}
+
+VALUE grpc_rb_new_event(grpc_event *ev) {
+ grpc_rb_event *wrapper = ALLOC(grpc_rb_event);
+ wrapper->wrapped = ev;
+ wrapper->mark = Qnil;
+ return Data_Wrap_Struct(rb_cEvent, grpc_rb_event_mark, grpc_rb_event_free,
+ wrapper);
+}
diff --git a/src/ruby/ext/grpc/rb_event.h b/src/ruby/ext/grpc/rb_event.h
index 459502c647..e4e4a796c2 100644
--- a/src/ruby/ext/grpc/rb_event.h
+++ b/src/ruby/ext/grpc/rb_event.h
@@ -35,12 +35,7 @@
#define GRPC_RB_EVENT_H_
#include <ruby.h>
-
-/* rb_sNewServerRpc is the struct that holds new server rpc details. */
-extern VALUE rb_sNewServerRpc;
-
-/* rb_sStruct is the struct that holds status details. */
-extern VALUE rb_sStatus;
+#include <grpc/grpc.h>
/* rb_cEvent is the Event class whose instances proxy grpc_event. */
extern VALUE rb_cEvent;
@@ -49,8 +44,8 @@ extern VALUE rb_cEvent;
event processing. */
extern VALUE rb_eEventError;
-/* Helper function to free an event. */
-void grpc_rb_event_finish(void *p);
+/* Used to create new ruby event objects */
+VALUE grpc_rb_new_event(grpc_event *ev);
/* Initializes the Event and EventError classes. */
void Init_google_rpc_event();
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index f0e432a6bc..eae011d33b 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -245,16 +245,27 @@ void grpc_rb_shutdown(void *vm) {
grpc_shutdown();
}
+/* Initialize the Google RPC module structs */
+
+/* rb_sNewServerRpc is the struct that holds new server rpc details. */
+VALUE rb_sNewServerRpc = Qnil;
+/* rb_sStatus is the struct that holds status details. */
+VALUE rb_sStatus = Qnil;
+
/* Initialize the Google RPC module. */
VALUE rb_mGoogle = Qnil;
VALUE rb_mGoogleRPC = Qnil;
VALUE rb_mGoogleRpcCore = Qnil;
+
void Init_grpc() {
grpc_init();
ruby_vm_at_exit(grpc_rb_shutdown);
rb_mGoogle = rb_define_module("Google");
rb_mGoogleRPC = rb_define_module_under(rb_mGoogle, "RPC");
rb_mGoogleRpcCore = rb_define_module_under(rb_mGoogleRPC, "Core");
+ rb_sNewServerRpc = rb_struct_define("NewServerRpc", "method", "host",
+ "deadline", "metadata", NULL);
+ rb_sStatus = rb_struct_define("Status", "code", "details", "metadata", NULL);
Init_google_rpc_byte_buffer();
Init_google_rpc_event();
diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h
index 68f8a06c30..c2c894244f 100644
--- a/src/ruby/ext/grpc/rb_grpc.h
+++ b/src/ruby/ext/grpc/rb_grpc.h
@@ -47,6 +47,12 @@ extern VALUE rb_mGoogleRpcCore;
/* Class used to wrap timeval structs. */
extern VALUE rb_cTimeVal;
+/* rb_sNewServerRpc is the struct that holds new server rpc details. */
+extern VALUE rb_sNewServerRpc;
+
+/* rb_sStruct is the struct that holds status details. */
+extern VALUE rb_sStatus;
+
/* GC_NOT_MARKED is used in calls to Data_Wrap_Struct to indicate that the
wrapped struct does not need to participate in ruby gc. */
extern const RUBY_DATA_FUNC GC_NOT_MARKED;
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index b16c8f8563..288ea083e6 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -73,6 +73,7 @@ module Google::RPC
# wait for the invocation to be accepted
ev = q.pluck(invoke_accepted, INFINITE_FUTURE)
raise OutOfTime if ev.nil?
+ ev.close
[finished_tag, client_metadata_read]
end
@@ -191,11 +192,17 @@ module Google::RPC
def writes_done(assert_finished=true)
@call.writes_done(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
- assert_event_type(ev, FINISH_ACCEPTED)
- logger.debug("Writes done: waiting for finish? #{assert_finished}")
+ begin
+ assert_event_type(ev, FINISH_ACCEPTED)
+ logger.debug("Writes done: waiting for finish? #{assert_finished}")
+ ensure
+ ev.close
+ end
+
if assert_finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
raise "unexpected event: #{ev.inspect}" if ev.nil?
+ ev.close
return @call.status
end
end
@@ -206,22 +213,21 @@ module Google::RPC
# event.
def finished
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
- if @call.metadata.nil?
- @call.metadata = ev.result.metadata
- else
- @call.metadata.merge!(ev.result.metadata)
- end
+ begin
+ raise "unexpected event: #{ev.inspect}" unless ev.type == FINISHED
+ if @call.metadata.nil?
+ @call.metadata = ev.result.metadata
+ else
+ @call.metadata.merge!(ev.result.metadata)
+ end
- if ev.result.code != Core::StatusCodes::OK
- raise BadStatus.new(ev.result.code, ev.result.details)
+ if ev.result.code != Core::StatusCodes::OK
+ raise BadStatus.new(ev.result.code, ev.result.details)
+ end
+ res = ev.result
+ ensure
+ ev.close
end
- res = ev.result
-
- # NOTE(temiola): This is necessary to allow the C call struct wrapped
- # within the active_call to be GCed; this is necessary so that other
- # C-level destructors get called in the required order.
- ev = nil # allow the event to be GCed
res
end
@@ -246,8 +252,11 @@ module Google::RPC
# call queue#pluck, and wait for WRITE_ACCEPTED, so as not to return
# until the flow control allows another send on this call.
ev = @cq.pluck(self, INFINITE_FUTURE)
- assert_event_type(ev, WRITE_ACCEPTED)
- ev = nil
+ begin
+ assert_event_type(ev, WRITE_ACCEPTED)
+ ensure
+ ev.close
+ end
end
# send_status sends a status to the remote endpoint
@@ -260,7 +269,11 @@ module Google::RPC
assert_queue_is_ready
@call.start_write_status(code, details, self)
ev = @cq.pluck(self, INFINITE_FUTURE)
- assert_event_type(ev, FINISH_ACCEPTED)
+ begin
+ assert_event_type(ev, FINISH_ACCEPTED)
+ ensure
+ ev.close
+ end
logger.debug("Status sent: #{code}:'#{details}'")
if assert_finished
return finished
@@ -283,13 +296,17 @@ module Google::RPC
@call.start_read(self)
ev = @cq.pluck(self, INFINITE_FUTURE)
- assert_event_type(ev, READ)
- logger.debug("received req: #{ev.result.inspect}")
- if !ev.result.nil?
- logger.debug("received req.to_s: #{ev.result.to_s}")
- res = @unmarshal.call(ev.result.to_s)
- logger.debug("received_req (unmarshalled): #{res.inspect}")
- return res
+ begin
+ assert_event_type(ev, READ)
+ logger.debug("received req: #{ev.result.inspect}")
+ if !ev.result.nil?
+ logger.debug("received req.to_s: #{ev.result.to_s}")
+ res = @unmarshal.call(ev.result.to_s)
+ logger.debug("received_req (unmarshalled): #{res.inspect}")
+ return res
+ end
+ ensure
+ ev.close
end
logger.debug('found nil; the final response has been sent')
nil
@@ -515,12 +532,15 @@ module Google::RPC
# confirms that no events are enqueued, and that the queue is not
# shutdown.
def assert_queue_is_ready
+ ev = nil
begin
ev = @cq.pluck(self, ZERO)
raise "unexpected event #{ev.inspect}" unless ev.nil?
rescue OutOfTime
# expected, nothing should be on the queue and the deadline was ZERO,
# except things using another tag
+ ensure
+ ev.close unless ev.nil?
end
end
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index fc9bb851aa..066ec851ac 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -149,15 +149,27 @@ module Google::RPC
payload = @marshal.call(req)
@call.start_write(Core::ByteBuffer.new(payload), write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- assert_event_type(ev, WRITE_ACCEPTED)
+ begin
+ assert_event_type(ev, WRITE_ACCEPTED)
+ ensure
+ ev.close
+ end
end
if is_client
@call.writes_done(write_tag)
ev = @cq.pluck(write_tag, INFINITE_FUTURE)
- assert_event_type(ev, FINISH_ACCEPTED)
+ begin
+ assert_event_type(ev, FINISH_ACCEPTED)
+ ensure
+ ev.close
+ end
logger.debug("bidi-client: sent #{count} reqs, waiting to finish")
ev = @cq.pluck(@finished_tag, INFINITE_FUTURE)
- assert_event_type(ev, FINISHED)
+ begin
+ assert_event_type(ev, FINISHED)
+ ensure
+ ev.close
+ end
logger.debug('bidi-client: finished received')
end
rescue StandardError => e
@@ -180,19 +192,23 @@ module Google::RPC
count += 1
@call.start_read(read_tag)
ev = @cq.pluck(read_tag, INFINITE_FUTURE)
- assert_event_type(ev, READ)
-
- # handle the next event.
- if ev.result.nil?
- @readq.push(END_OF_READS)
- logger.debug('done reading!')
- break
+ begin
+ assert_event_type(ev, READ)
+
+ # handle the next event.
+ if ev.result.nil?
+ @readq.push(END_OF_READS)
+ logger.debug('done reading!')
+ break
+ end
+
+ # push the latest read onto the queue and continue reading
+ logger.debug("received req.to_s: #{ev.result.to_s}")
+ res = @unmarshal.call(ev.result.to_s)
+ @readq.push(res)
+ ensure
+ ev.close
end
-
- # push the latest read onto the queue and continue reading
- logger.debug("received req.to_s: #{ev.result.to_s}")
- res = @unmarshal.call(ev.result.to_s)
- @readq.push(res)
end
rescue StandardError => e
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 2054d73b48..81db68804e 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -217,18 +217,13 @@ module Google::RPC
next if ev.nil?
if ev.type != SERVER_RPC_NEW
logger.warn("bad evt: got:#{ev.type}, want:#{SERVER_RPC_NEW}")
+ ev.close
next
end
c = new_active_server_call(ev.call, ev.result)
if !c.nil?
mth = ev.result.method.to_sym
-
- # NOTE(temiola): This is necessary to allow the C call struct wrapped
- # within the active_call created by the event to be GCed; this is
- # necessary so that other C-level destructors get called in the
- # required order.
- ev = nil
-
+ ev.close
@pool.schedule(c) do |call|
rpc_descs[mth].run_server_method(call, rpc_handlers[mth])
end