diff options
Diffstat (limited to 'src')
36 files changed, 485 insertions, 76 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index c01e830cd6..4cc6a4cf39 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -1106,8 +1106,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "const $Request$& request, " "::grpc::CompletionQueue* cq) {\n"); printer->Print(*vars, - " return new " - "::grpc::ClientAsyncResponseReader< $Response$>(" + " return " + "::grpc::ClientAsyncResponseReader< $Response$>::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, request);\n" @@ -1129,7 +1129,7 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientContext* context, $Response$* response, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, - " return new ::grpc::ClientAsyncWriter< $Request$>(" + " return ::grpc::ClientAsyncWriter< $Request$>::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, response, tag);\n" @@ -1152,7 +1152,7 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientContext* context, const $Request$& request, " "::grpc::CompletionQueue* cq, void* tag) {\n"); printer->Print(*vars, - " return new ::grpc::ClientAsyncReader< $Response$>(" + " return ::grpc::ClientAsyncReader< $Response$>::Create(" "channel_.get(), cq, " "rpcmethod_$Method$_, " "context, request, tag);\n" @@ -1174,13 +1174,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer, "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* " "$ns$$Service$::Stub::Async$Method$Raw(::grpc::ClientContext* context, " "::grpc::CompletionQueue* cq, void* tag) {\n"); - printer->Print(*vars, - " return new " - "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>(" - "channel_.get(), cq, " - "rpcmethod_$Method$_, " - "context, tag);\n" - "}\n\n"); + printer->Print( + *vars, + " return " + "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create(" + "channel_.get(), cq, " + "rpcmethod_$Method$_, " + "context, tag);\n" + "}\n\n"); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 291a1f1b0b..a271d05ca8 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -1153,7 +1153,7 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_call != NULL); - grpc_call_destroy(glb_policy->lb_call); + grpc_call_unref(glb_policy->lb_call); glb_policy->lb_call = NULL; grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index a42fe7ef3c..9aa457d792 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -160,6 +160,7 @@ typedef struct { } child_call; struct grpc_call { + gpr_refcount ext_ref; gpr_arena *arena; grpc_completion_queue *cq; grpc_polling_entity pollent; @@ -170,7 +171,7 @@ struct grpc_call { /* client or server call */ bool is_client; - /** has grpc_call_destroy been called */ + /** has grpc_call_unref been called */ bool destroy_called; /** flag indicating that cancellation is inherited */ bool cancellation_is_inherited; @@ -282,6 +283,10 @@ static void add_init_error(grpc_error **composite, grpc_error *new) { *composite = grpc_error_add_child(*composite, new); } +void *grpc_call_arena_alloc(grpc_call *call, size_t size) { + return gpr_arena_alloc(call->arena, size); +} + static parent_call *get_or_create_parent_call(grpc_call *call) { parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); if (p == NULL) { @@ -312,6 +317,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel)); call = gpr_arena_alloc(arena, sizeof(grpc_call) + channel_stack->call_stack_size); + gpr_ref_init(&call->ext_ref, 1); call->arena = arena; *out_call = call; call->channel = args->channel; @@ -408,7 +414,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, call->send_deadline = send_deadline; GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); - /* initial refcount dropped by grpc_call_destroy */ + /* initial refcount dropped by grpc_call_unref */ grpc_call_element_args call_args = { .call_stack = CALL_STACK_FROM_CALL(call), .server_transport_data = args->server_transport_data, @@ -533,12 +539,16 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, GPR_TIMER_END("destroy_call", 0); } -void grpc_call_destroy(grpc_call *c) { +void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); } + +void grpc_call_unref(grpc_call *c) { + if (!gpr_unref(&c->ext_ref)) return; + child_call *cc = c->child_call; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - GPR_TIMER_BEGIN("grpc_call_destroy", 0); - GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); + GPR_TIMER_BEGIN("grpc_call_unref", 0); + GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); if (cc) { parent_call *pc = get_parent_call(cc->parent); @@ -565,7 +575,7 @@ void grpc_call_destroy(grpc_call *c) { } GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy"); grpc_exec_ctx_finish(&exec_ctx); - GPR_TIMER_END("grpc_call_destroy", 0); + GPR_TIMER_END("grpc_call_unref", 0); } grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 38800ea37b..26c81e9aca 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -345,7 +345,7 @@ static void request_matcher_destroy(request_matcher *rm) { static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, grpc_error *error) { - grpc_call_destroy(grpc_call_from_top_element(elem)); + grpc_call_unref(grpc_call_from_top_element(elem)); } static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index c985183ae7..fac1ba9d43 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -131,7 +131,7 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; - ops->FillOps(cops, &nops); + ops->FillOps(call->call(), cops, &nops); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(call->call(), cops, nops, ops, nullptr)); } diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 3d884cf62e..2516232840 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -74,7 +74,7 @@ ClientContext::ClientContext() ClientContext::~ClientContext() { if (call_) { - grpc_call_destroy(call_); + grpc_call_unref(call_); } g_client_callbacks->Destructor(this); } diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc index 7a2fdf941c..4adb2a5359 100644 --- a/src/cpp/client/generic_stub.cc +++ b/src/cpp/client/generic_stub.cc @@ -42,7 +42,7 @@ std::unique_ptr<GenericClientAsyncReaderWriter> GenericStub::Call( ClientContext* context, const grpc::string& method, CompletionQueue* cq, void* tag) { return std::unique_ptr<GenericClientAsyncReaderWriter>( - new GenericClientAsyncReaderWriter( + GenericClientAsyncReaderWriter::Create( channel_.get(), cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING), context, tag)); } diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 0dd758ec4e..4f8b826aec 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -96,6 +96,12 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { ::grpc_byte_buffer_destroy(bb); } +void CoreCodegen::grpc_call_ref(grpc_call* call) { ::grpc_call_ref(call); } +void CoreCodegen::grpc_call_unref(grpc_call* call) { ::grpc_call_unref(call); } +void* CoreCodegen::grpc_call_arena_alloc(grpc_call* call, size_t length) { + return ::grpc_call_arena_alloc(call, length); +} + int CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader, grpc_byte_buffer* buffer) { return ::grpc_byte_buffer_reader_init(reader, buffer); diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 423f347acd..b4d6961db8 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -607,7 +607,7 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) { static const size_t MAX_OPS = 8; size_t nops = 0; grpc_op cops[MAX_OPS]; - ops->FillOps(cops, &nops); + ops->FillOps(call->call(), cops, &nops); auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr); GPR_ASSERT(GRPC_CALL_OK == result); } diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc index 33ad0ae142..923556413e 100644 --- a/src/cpp/server/server_context.cc +++ b/src/cpp/server/server_context.cc @@ -62,7 +62,7 @@ class ServerContext::CompletionOp final : public CallOpSetInterface { finalized_(false), cancelled_(0) {} - void FillOps(grpc_op* ops, size_t* nops) override; + void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override; bool FinalizeResult(void** tag, bool* status) override; bool CheckCancelled(CompletionQueue* cq) { @@ -100,7 +100,8 @@ void ServerContext::CompletionOp::Unref() { } } -void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) { +void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops, + size_t* nops) { ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER; ops->data.recv_close_on_server.cancelled = &cancelled_; ops->flags = 0; @@ -151,7 +152,7 @@ ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr) ServerContext::~ServerContext() { if (call_) { - grpc_call_destroy(call_); + grpc_call_unref(call_); } if (completion_op_) { completion_op_->Unref(); diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c index 88adef0333..f6cff454bd 100644 --- a/src/csharp/ext/grpc_csharp_ext.c +++ b/src/csharp/ext/grpc_csharp_ext.c @@ -526,7 +526,7 @@ GPR_EXPORT char *GPR_CALLTYPE grpcsharp_call_get_peer(grpc_call *call) { GPR_EXPORT void GPR_CALLTYPE gprsharp_free(void *p) { gpr_free(p); } GPR_EXPORT void GPR_CALLTYPE grpcsharp_call_destroy(grpc_call *call) { - grpc_call_destroy(call); + grpc_call_unref(call); } GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_unary( diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 0f192ac90a..fe0c80e642 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -248,6 +248,7 @@ class SendMessageOp : public Op { out->data.send_message.send_message = send_message; return true; } + bool IsFinalOp() { return false; } void OnComplete(bool success) {} @@ -264,6 +265,7 @@ class SendClientCloseOp : public Op { EscapableHandleScope scope; return scope.Escape(Nan::True()); } + bool ParseOp(Local<Value> value, grpc_op *out) { return true; } bool IsFinalOp() { return false; } void OnComplete(bool success) {} @@ -501,7 +503,7 @@ void DestroyTag(void *tag) { void Call::DestroyCall() { if (this->wrapped_call != NULL) { - grpc_call_destroy(this->wrapped_call); + grpc_call_unref(this->wrapped_call); this->wrapped_call = NULL; } } diff --git a/src/node/performance/benchmark_client.js b/src/node/performance/benchmark_client.js index 5ef5260a96..e7c426b2ff 100644 --- a/src/node/performance/benchmark_client.js +++ b/src/node/performance/benchmark_client.js @@ -88,7 +88,10 @@ function timeDiffToNanos(time_diff) { */ function BenchmarkClient(server_targets, channels, histogram_params, security_params) { - var options = {}; + var options = { + "grpc.max_receive_message_length": -1, + "grpc.max_send_message_length": -1 + }; var creds; if (security_params) { var ca_path; @@ -180,6 +183,8 @@ BenchmarkClient.prototype.startClosedLoop = function( self.last_wall_time = process.hrtime(); + self.last_usage = process.cpuUsage(); + var makeCall; var argument; @@ -270,6 +275,8 @@ BenchmarkClient.prototype.startPoisson = function( self.last_wall_time = process.hrtime(); + self.last_usage = process.cpuUsage(); + var makeCall; var argument; @@ -354,9 +361,11 @@ BenchmarkClient.prototype.startPoisson = function( */ BenchmarkClient.prototype.mark = function(reset) { var wall_time_diff = process.hrtime(this.last_wall_time); + var usage_diff = process.cpuUsage(this.last_usage); var histogram = this.histogram; if (reset) { this.last_wall_time = process.hrtime(); + this.last_usage = process.cpuUsage(); this.histogram = new Histogram(histogram.resolution, histogram.max_possible); } @@ -371,9 +380,8 @@ BenchmarkClient.prototype.mark = function(reset) { count: histogram.getCount() }, time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9, - // Not sure how to measure these values - time_user: 0, - time_system: 0 + time_user: usage_diff.user / 1000000, + time_system: usage_diff.system / 1000000 }; }; diff --git a/src/node/performance/benchmark_client_express.js b/src/node/performance/benchmark_client_express.js index e749956599..157bf1b6de 100644 --- a/src/node/performance/benchmark_client_express.js +++ b/src/node/performance/benchmark_client_express.js @@ -95,7 +95,6 @@ function BenchmarkClient(server_targets, channels, histogram_params, var host_port; host_port = server_targets[i % server_targets.length].split(':'); var new_options = _.assign({hostname: host_port[0], port: +host_port[1]}, options); - new_options.agent = new protocol.Agent(new_options); this.client_options[i] = new_options; } @@ -137,6 +136,7 @@ BenchmarkClient.prototype.startClosedLoop = function( } self.last_wall_time = process.hrtime(); + self.last_usage = process.cpuUsage(); var argument = { response_size: resp_size, @@ -207,6 +207,7 @@ BenchmarkClient.prototype.startPoisson = function( } self.last_wall_time = process.hrtime(); + self.last_usage = process.cpuUsage(); var argument = { response_size: resp_size, @@ -264,9 +265,11 @@ BenchmarkClient.prototype.startPoisson = function( */ BenchmarkClient.prototype.mark = function(reset) { var wall_time_diff = process.hrtime(this.last_wall_time); + var usage_diff = process.cpuUsage(this.last_usage); var histogram = this.histogram; if (reset) { this.last_wall_time = process.hrtime(); + this.last_usage = process.cpuUsage(); this.histogram = new Histogram(histogram.resolution, histogram.max_possible); } @@ -281,9 +284,8 @@ BenchmarkClient.prototype.mark = function(reset) { count: histogram.getCount() }, time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9, - // Not sure how to measure these values - time_user: 0, - time_system: 0 + time_user: usage_diff.user / 1000000, + time_system: usage_diff.system / 1000000 }; }; diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js index ea85029d98..a4d5ee1c26 100644 --- a/src/node/performance/benchmark_server.js +++ b/src/node/performance/benchmark_server.js @@ -132,7 +132,12 @@ function BenchmarkServer(host, port, tls, generic, response_size) { server_creds = grpc.ServerCredentials.createInsecure(); } - var server = new grpc.Server(); + var options = { + "grpc.max_receive_message_length": -1, + "grpc.max_send_message_length": -1 + }; + + var server = new grpc.Server(options); this.port = server.bind(host + ':' + port, server_creds); if (generic) { server.addService(genericService, { @@ -156,6 +161,7 @@ util.inherits(BenchmarkServer, EventEmitter); BenchmarkServer.prototype.start = function() { this.server.start(); this.last_wall_time = process.hrtime(); + this.last_usage = process.cpuUsage(); this.emit('started'); }; @@ -175,14 +181,15 @@ BenchmarkServer.prototype.getPort = function() { */ BenchmarkServer.prototype.mark = function(reset) { var wall_time_diff = process.hrtime(this.last_wall_time); + var usage_diff = process.cpuUsage(this.last_usage); if (reset) { this.last_wall_time = process.hrtime(); + this.last_usage = process.cpuUsage(); } return { time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9, - // Not sure how to measure these values - time_user: 0, - time_system: 0 + time_user: usage_diff.user / 1000000, + time_system: usage_diff.system / 1000000 }; }; diff --git a/src/node/performance/benchmark_server_express.js b/src/node/performance/benchmark_server_express.js index 4b695eb467..fab4f5307c 100644 --- a/src/node/performance/benchmark_server_express.js +++ b/src/node/performance/benchmark_server_express.js @@ -81,6 +81,7 @@ BenchmarkServer.prototype.start = function() { var self = this; this.server.listen(this.input_port, this.input_hostname, function() { self.last_wall_time = process.hrtime(); + self.last_usage = process.cpuUsage(); self.emit('started'); }); }; @@ -91,14 +92,15 @@ BenchmarkServer.prototype.getPort = function() { BenchmarkServer.prototype.mark = function(reset) { var wall_time_diff = process.hrtime(this.last_wall_time); + var usage_diff = process.cpuUsage(this.last_usage); if (reset) { this.last_wall_time = process.hrtime(); + this.last_usage = process.cpuUsage(); } return { time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9, - // Not sure how to measure these values - time_user: 0, - time_system: 0 + time_user: usage_diff.user / 1000000, + time_system: usage_diff.system / 1000000 }; }; diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m index 46e9fee7e1..1faba3e20b 100644 --- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m +++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m @@ -315,7 +315,7 @@ } - (void)dealloc { - grpc_call_destroy(_call); + grpc_call_unref(_call); } @end diff --git a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m index 6d94116c75..a78dd93993 100644 --- a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m +++ b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m @@ -258,7 +258,7 @@ unsigned int parse_h2_length(const char *field) { grpc_metadata_array_destroy(&request_metadata_recv); grpc_call_details_destroy(&call_details); - grpc_call_destroy(c); + grpc_call_unref(c); cq_verifier_destroy(cqv); @@ -437,7 +437,7 @@ unsigned int parse_h2_length(const char *field) { grpc_metadata_array_destroy(&request_metadata_recv); grpc_call_details_destroy(&call_details); - grpc_call_destroy(c); + grpc_call_unref(c); cq_verifier_destroy(cqv); diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 48a374fa08..d3fd88416b 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -65,7 +65,7 @@ static zend_object_handlers call_ce_handlers; /* Frees and destroys an instance of wrapped_grpc_call */ PHP_GRPC_FREE_WRAPPED_FUNC_START(wrapped_grpc_call) if (p->owned && p->wrapped != NULL) { - grpc_call_destroy(p->wrapped); + grpc_call_unref(p->wrapped); } PHP_GRPC_FREE_WRAPPED_FUNC_END() diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi index cc3bd7a067..aa3558b843 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/call.pyx.pxi @@ -106,7 +106,7 @@ cdef class Call: def __dealloc__(self): if self.c_call != NULL: - grpc_call_destroy(self.c_call) + grpc_call_unref(self.c_call) grpc_shutdown() # The object *should* always be valid from Python. Used for debugging. diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 0b2bdef48b..4f1776e002 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -329,7 +329,7 @@ cdef extern from "grpc/grpc.h": const char *description, void *reserved) nogil char *grpc_call_get_peer(grpc_call *call) nogil - void grpc_call_destroy(grpc_call *call) nogil + void grpc_call_unref(grpc_call *call) nogil grpc_channel *grpc_insecure_channel_create(const char *target, const grpc_channel_args *args, diff --git a/src/ruby/end2end/forking_client_client.rb b/src/ruby/end2end/forking_client_client.rb new file mode 100755 index 0000000000..c358d79f59 --- /dev/null +++ b/src/ruby/end2end/forking_client_client.rb @@ -0,0 +1,69 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# Prompted by and minimal repro of https://github.com/grpc/grpc/issues/10658 + +require_relative './end2end_common' + +def main + server_port = '' + OptionParser.new do |opts| + opts.on('--client_control_port=P', String) do + STDERR.puts 'client control port not used' + end + opts.on('--server_port=P', String) do |p| + server_port = p + end + end.parse! + + p = fork do + stub = Echo::EchoServer::Stub.new("localhost:#{server_port}", + :this_channel_is_insecure) + stub.echo(Echo::EchoRequest.new(request: 'hello')) + end + + begin + Timeout.timeout(10) do + Process.wait(p) + end + rescue Timeout::Error + STDERR.puts "timeout waiting for forked process #{p}" + Process.kill('SIGKILL', p) + Process.wait(p) + raise 'Timed out waiting for client process. ' \ + 'It likely hangs when using gRPC after loading it and then forking' + end + + client_exit_code = $CHILD_STATUS + fail "forked process failed #{client_exit_code}" if client_exit_code != 0 +end + +main diff --git a/src/ruby/end2end/forking_client_driver.rb b/src/ruby/end2end/forking_client_driver.rb new file mode 100755 index 0000000000..2c67b33590 --- /dev/null +++ b/src/ruby/end2end/forking_client_driver.rb @@ -0,0 +1,69 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +def main + STDERR.puts 'start server' + server_runner = ServerRunner.new(EchoServerImpl) + server_port = server_runner.run + + # TODO(apolcyn) Can we get rid of this sleep? + # Without it, an immediate call to the just started EchoServer + # fails with UNAVAILABLE + sleep 1 + + STDERR.puts 'start client' + _, client_pid = start_client('forking_client_client.rb', + server_port) + + begin + Timeout.timeout(10) do + Process.wait(client_pid) + end + rescue Timeout::Error + STDERR.puts "timeout wait for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. ' \ + 'It likely hangs when requiring grpc, then forking, then using grpc ' + end + + client_exit_code = $CHILD_STATUS + if client_exit_code != 0 + fail "forking client client failed, exit code #{client_exit_code}" + end + + server_runner.stop +end + +main diff --git a/src/ruby/end2end/grpc_class_init_client.rb b/src/ruby/end2end/grpc_class_init_client.rb new file mode 100755 index 0000000000..ee79292119 --- /dev/null +++ b/src/ruby/end2end/grpc_class_init_client.rb @@ -0,0 +1,77 @@ +#!/usr/bin/env ruby + +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# For GRPC::Core classes, which use the grpc c-core, object init +# is interesting because it's related to overall library init. + +require_relative './end2end_common' + +def main + grpc_class = '' + OptionParser.new do |opts| + opts.on('--grpc_class=P', String) do |p| + grpc_class = p + end + end.parse! + + test_proc = nil + + case grpc_class + when 'channel' + test_proc = proc do + GRPC::Core::Channel.new('dummy_host', nil, :this_channel_is_insecure) + end + when 'server' + test_proc = proc do + GRPC::Core::Server.new({}) + end + when 'channel_credentials' + test_proc = proc do + GRPC::Core::ChannelCredentials.new + end + when 'call_credentials' + test_proc = proc do + GRPC::Core::CallCredentials.new(proc { |noop| noop }) + end + when 'compression_options' + test_proc = proc do + GRPC::Core::CompressionOptions.new + end + else + fail "bad --grpc_class=#{grpc_class} param" + end + + th = Thread.new { test_proc.call } + test_proc.call + th.join +end + +main diff --git a/src/ruby/end2end/grpc_class_init_driver.rb b/src/ruby/end2end/grpc_class_init_driver.rb new file mode 100755 index 0000000000..764d029f14 --- /dev/null +++ b/src/ruby/end2end/grpc_class_init_driver.rb @@ -0,0 +1,67 @@ +#!/usr/bin/env ruby + +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +require_relative './end2end_common' + +def main + native_grpc_classes = %w( channel + server + channel_credentials + call_credentials + compression_options ) + + native_grpc_classes.each do |grpc_class| + STDERR.puts 'start client' + this_dir = File.expand_path(File.dirname(__FILE__)) + client_path = File.join(this_dir, 'grpc_class_init_client.rb') + client_pid = Process.spawn(RbConfig.ruby, + client_path, + "--grpc_class=#{grpc_class}") + begin + Timeout.timeout(10) do + Process.wait(client_pid) + end + rescue Timeout::Error + STDERR.puts "timeout waiting for client pid #{client_pid}" + Process.kill('SIGKILL', client_pid) + Process.wait(client_pid) + STDERR.puts 'killed client child' + raise 'Timed out waiting for client process. ' \ + 'It likely hangs when the first constructed gRPC object has ' \ + "type: #{grpc_class}" + end + + client_exit_code = $CHILD_STATUS + fail "client failed, exit code #{client_exit_code}" if client_exit_code != 0 + end +end + +main diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index aef7175af9..0067e10866 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -101,7 +101,7 @@ typedef struct grpc_rb_call { static void destroy_call(grpc_rb_call *call) { /* Ensure that we only try to destroy the call once */ if (call->wrapped != NULL) { - grpc_call_destroy(call->wrapped); + grpc_call_unref(call->wrapped); call->wrapped = NULL; grpc_rb_completion_queue_destroy(call->queue); call->queue = NULL; diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index d6545502b0..7a5a74580d 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -223,6 +223,8 @@ static VALUE grpc_rb_call_credentials_init(VALUE self, VALUE proc) { grpc_call_credentials *creds = NULL; grpc_metadata_credentials_plugin plugin; + grpc_ruby_once_init(); + TypedData_Get_Struct(self, grpc_rb_call_credentials, &grpc_rb_call_credentials_data_type, wrapper); @@ -283,8 +285,6 @@ void Init_grpc_call_credentials() { grpc_rb_call_credentials_compose, -1); id_callback = rb_intern("__callback"); - - grpc_rb_event_queue_thread_start(); } /* Gets the wrapped grpc_call_credentials from the ruby wrapper */ diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index ecfffd1839..a802183726 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -89,10 +89,14 @@ typedef struct grpc_rb_channel { static void grpc_rb_channel_try_register_connection_polling( grpc_rb_channel *wrapper); static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper); +static void *wait_until_channel_polling_thread_started_no_gil(void *); +static void wait_until_channel_polling_thread_started_unblocking_func(void *); static grpc_completion_queue *channel_polling_cq; static gpr_mu global_connection_polling_mu; +static gpr_cv global_connection_polling_cv; static int abort_channel_polling = 0; +static int channel_polling_thread_started = 0; /* Destroys Channel instances. */ static void grpc_rb_channel_free(void *p) { @@ -166,6 +170,11 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { grpc_channel_args args; MEMZERO(&args, grpc_channel_args, 1); + grpc_ruby_once_init(); + rb_thread_call_without_gvl( + wait_until_channel_polling_thread_started_no_gil, NULL, + wait_until_channel_polling_thread_started_unblocking_func, NULL); + /* "3" == 3 mandatory args */ rb_scan_args(argc, argv, "3", &target, &channel_args, &credentials); @@ -440,6 +449,7 @@ static void grpc_rb_channel_try_register_connection_polling( } gpr_mu_lock(&global_connection_polling_mu); + GPR_ASSERT(channel_polling_thread_started || abort_channel_polling); conn_state = grpc_channel_check_connectivity_state(wrapper->wrapped, 0); if (conn_state != wrapper->current_connectivity_state) { wrapper->current_connectivity_state = conn_state; @@ -473,7 +483,7 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { } // Note this loop breaks out with a single call of -// "grpc_rb_event_unblocking_func". +// "run_poll_channels_loop_no_gil". // This assumes that a ruby call the unblocking func // indicates process shutdown. // In the worst case, this stops polling channel connectivity @@ -481,6 +491,14 @@ static void grpc_rb_channel_safe_destroy(grpc_rb_channel *wrapper) { static void *run_poll_channels_loop_no_gil(void *arg) { grpc_event event; (void)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop_no_gil - begin"); + + gpr_mu_lock(&global_connection_polling_mu); + GPR_ASSERT(!channel_polling_thread_started); + channel_polling_thread_started = 1; + gpr_cv_broadcast(&global_connection_polling_cv); + gpr_mu_unlock(&global_connection_polling_mu); + for (;;) { event = grpc_completion_queue_next( channel_polling_cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); @@ -500,7 +518,7 @@ static void *run_poll_channels_loop_no_gil(void *arg) { } // Notify the channel polling loop to cleanup and shutdown. -static void grpc_rb_event_unblocking_func(void *arg) { +static void run_poll_channels_loop_unblocking_func(void *arg) { (void)arg; gpr_mu_lock(&global_connection_polling_mu); gpr_log(GPR_DEBUG, @@ -518,10 +536,37 @@ static VALUE run_poll_channels_loop(VALUE arg) { GPR_DEBUG, "GRPC_RUBY: run_poll_channels_loop - create connection polling thread"); rb_thread_call_without_gvl(run_poll_channels_loop_no_gil, NULL, - grpc_rb_event_unblocking_func, NULL); + run_poll_channels_loop_unblocking_func, NULL); + return Qnil; } +static void *wait_until_channel_polling_thread_started_no_gil(void *arg) { + (void)arg; + gpr_log(GPR_DEBUG, "GRPC_RUBY: wait for channel polling thread to start"); + gpr_mu_lock(&global_connection_polling_mu); + while (!channel_polling_thread_started && !abort_channel_polling) { + gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu, + gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&global_connection_polling_mu); + + return NULL; +} + +static void wait_until_channel_polling_thread_started_unblocking_func( + void *arg) { + (void)arg; + gpr_mu_lock(&global_connection_polling_mu); + gpr_log(GPR_DEBUG, + "GRPC_RUBY: " + "wait_until_channel_polling_thread_started_unblocking_func - begin " + "aborting connection polling"); + abort_channel_polling = 1; + gpr_cv_broadcast(&global_connection_polling_cv); + gpr_mu_unlock(&global_connection_polling_mu); +} + /* Temporary fix for * https://github.com/GoogleCloudPlatform/google-cloud-ruby/issues/899. * Transports in idle channels can get destroyed. Normally c-core re-connects, @@ -532,11 +577,26 @@ static VALUE run_poll_channels_loop(VALUE arg) { * calls - so that c-core can reconnect if needed, when there aren't any RPC's. * TODO(apolcyn) remove this when core handles new RPCs on dead connections. */ -static void start_poll_channels_loop() { - channel_polling_cq = grpc_completion_queue_create_for_next(NULL); +void grpc_rb_channel_polling_thread_start() { + VALUE background_thread = Qnil; + + GPR_ASSERT(!abort_channel_polling); + GPR_ASSERT(!channel_polling_thread_started); + GPR_ASSERT(channel_polling_cq == NULL); + gpr_mu_init(&global_connection_polling_mu); - abort_channel_polling = 0; - rb_thread_create(run_poll_channels_loop, NULL); + gpr_cv_init(&global_connection_polling_cv); + + channel_polling_cq = grpc_completion_queue_create_for_next(NULL); + background_thread = rb_thread_create(run_poll_channels_loop, NULL); + + if (!RTEST(background_thread)) { + gpr_log(GPR_DEBUG, "GRPC_RUBY: failed to spawn channel polling thread"); + gpr_mu_lock(&global_connection_polling_mu); + abort_channel_polling = 1; + gpr_cv_broadcast(&global_connection_polling_cv); + gpr_mu_unlock(&global_connection_polling_mu); + } } static void Init_grpc_propagate_masks() { @@ -608,7 +668,6 @@ void Init_grpc_channel() { id_insecure_channel = rb_intern("this_channel_is_insecure"); Init_grpc_propagate_masks(); Init_grpc_connectivity_states(); - start_poll_channels_loop(); } /* Gets the wrapped channel from the ruby wrapper */ diff --git a/src/ruby/ext/grpc/rb_channel.h b/src/ruby/ext/grpc/rb_channel.h index 77e1f6acbc..fdceb79bca 100644 --- a/src/ruby/ext/grpc/rb_channel.h +++ b/src/ruby/ext/grpc/rb_channel.h @@ -41,6 +41,8 @@ /* Initializes the Channel class. */ void Init_grpc_channel(); +void grpc_rb_channel_polling_thread_start(); + /* Gets the wrapped channel from the ruby wrapper */ grpc_channel* grpc_rb_get_wrapped_channel(VALUE v); diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c index f30426b26d..db713ed821 100644 --- a/src/ruby/ext/grpc/rb_channel_credentials.c +++ b/src/ruby/ext/grpc/rb_channel_credentials.c @@ -161,6 +161,9 @@ static VALUE grpc_rb_channel_credentials_init(int argc, VALUE *argv, grpc_ssl_pem_key_cert_pair key_cert_pair; const char *pem_root_certs_cstr = NULL; MEMZERO(&key_cert_pair, grpc_ssl_pem_key_cert_pair, 1); + + grpc_ruby_once_init(); + /* "03" == no mandatory arg, 3 optional */ rb_scan_args(argc, argv, "03", &pem_root_certs, &pem_private_key, &pem_cert_chain); diff --git a/src/ruby/ext/grpc/rb_compression_options.c b/src/ruby/ext/grpc/rb_compression_options.c index 7de3c3c528..45c963dca6 100644 --- a/src/ruby/ext/grpc/rb_compression_options.c +++ b/src/ruby/ext/grpc/rb_compression_options.c @@ -100,8 +100,11 @@ static rb_data_type_t grpc_rb_compression_options_data_type = { Allocate the wrapped grpc compression options and initialize it here too. */ static VALUE grpc_rb_compression_options_alloc(VALUE cls) { - grpc_rb_compression_options *wrapper = - gpr_malloc(sizeof(grpc_rb_compression_options)); + grpc_rb_compression_options *wrapper = NULL; + + grpc_ruby_once_init(); + + wrapper = gpr_malloc(sizeof(grpc_rb_compression_options)); wrapper->wrapped = NULL; wrapper->wrapped = gpr_malloc(sizeof(grpc_compression_options)); grpc_compression_options_init(wrapper->wrapped); diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c index d45a5eaef8..5be8861e0c 100644 --- a/src/ruby/ext/grpc/rb_grpc.c +++ b/src/ruby/ext/grpc/rb_grpc.c @@ -47,6 +47,7 @@ #include "rb_channel.h" #include "rb_channel_credentials.h" #include "rb_compression_options.h" +#include "rb_event_thread.h" #include "rb_loader.h" #include "rb_server.h" #include "rb_server_credentials.h" @@ -289,17 +290,14 @@ VALUE sym_metadata = Qundef; static gpr_once g_once_init = GPR_ONCE_INIT; -static void grpc_ruby_once_init() { +static void grpc_ruby_once_init_internal() { grpc_init(); + grpc_rb_event_queue_thread_start(); + grpc_rb_channel_polling_thread_start(); atexit(grpc_rb_shutdown); } -void Init_grpc_c() { - if (!grpc_rb_load_core()) { - rb_raise(rb_eLoadError, "Couldn't find or load gRPC's dynamic C core"); - return; - } - +void grpc_ruby_once_init() { /* ruby_vm_at_exit doesn't seem to be working. It would crash once every * blue moon, and some users are getting it repeatedly. See the discussions * - https://github.com/grpc/grpc/pull/5337 @@ -310,7 +308,14 @@ void Init_grpc_c() { * then loaded again by another VM within the same process, we need to * schedule our initialization and destruction only once. */ - gpr_once_init(&g_once_init, grpc_ruby_once_init); + gpr_once_init(&g_once_init, grpc_ruby_once_init_internal); +} + +void Init_grpc_c() { + if (!grpc_rb_load_core()) { + rb_raise(rb_eLoadError, "Couldn't find or load gRPC's dynamic C core"); + return; + } grpc_rb_mGRPC = rb_define_module("GRPC"); grpc_rb_mGrpcCore = rb_define_module_under(grpc_rb_mGRPC, "Core"); diff --git a/src/ruby/ext/grpc/rb_grpc.h b/src/ruby/ext/grpc/rb_grpc.h index 2ee1faaca5..8538a74211 100644 --- a/src/ruby/ext/grpc/rb_grpc.h +++ b/src/ruby/ext/grpc/rb_grpc.h @@ -82,4 +82,6 @@ VALUE grpc_rb_cannot_init_copy(VALUE copy, VALUE self); /* grpc_rb_time_timeval creates a gpr_timespec from a ruby time object. */ gpr_timespec grpc_rb_time_timeval(VALUE time, int interval); +void grpc_ruby_once_init(); + #endif /* GRPC_RB_H_ */ diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index f189e88198..1c68b2dc7c 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -108,6 +108,7 @@ grpc_channel_create_call_type grpc_channel_create_call_import; grpc_channel_ping_type grpc_channel_ping_import; grpc_channel_register_call_type grpc_channel_register_call_import; grpc_channel_create_registered_call_type grpc_channel_create_registered_call_import; +grpc_call_arena_alloc_type grpc_call_arena_alloc_import; grpc_call_start_batch_type grpc_call_start_batch_import; grpc_call_get_peer_type grpc_call_get_peer_import; grpc_census_call_set_context_type grpc_census_call_set_context_import; @@ -119,7 +120,8 @@ grpc_lame_client_channel_create_type grpc_lame_client_channel_create_import; grpc_channel_destroy_type grpc_channel_destroy_import; grpc_call_cancel_type grpc_call_cancel_import; grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import; -grpc_call_destroy_type grpc_call_destroy_import; +grpc_call_ref_type grpc_call_ref_import; +grpc_call_unref_type grpc_call_unref_import; grpc_server_request_call_type grpc_server_request_call_import; grpc_server_register_method_type grpc_server_register_method_import; grpc_server_request_registered_call_type grpc_server_request_registered_call_import; @@ -404,6 +406,7 @@ void grpc_rb_load_imports(HMODULE library) { grpc_channel_ping_import = (grpc_channel_ping_type) GetProcAddress(library, "grpc_channel_ping"); grpc_channel_register_call_import = (grpc_channel_register_call_type) GetProcAddress(library, "grpc_channel_register_call"); grpc_channel_create_registered_call_import = (grpc_channel_create_registered_call_type) GetProcAddress(library, "grpc_channel_create_registered_call"); + grpc_call_arena_alloc_import = (grpc_call_arena_alloc_type) GetProcAddress(library, "grpc_call_arena_alloc"); grpc_call_start_batch_import = (grpc_call_start_batch_type) GetProcAddress(library, "grpc_call_start_batch"); grpc_call_get_peer_import = (grpc_call_get_peer_type) GetProcAddress(library, "grpc_call_get_peer"); grpc_census_call_set_context_import = (grpc_census_call_set_context_type) GetProcAddress(library, "grpc_census_call_set_context"); @@ -415,7 +418,8 @@ void grpc_rb_load_imports(HMODULE library) { grpc_channel_destroy_import = (grpc_channel_destroy_type) GetProcAddress(library, "grpc_channel_destroy"); grpc_call_cancel_import = (grpc_call_cancel_type) GetProcAddress(library, "grpc_call_cancel"); grpc_call_cancel_with_status_import = (grpc_call_cancel_with_status_type) GetProcAddress(library, "grpc_call_cancel_with_status"); - grpc_call_destroy_import = (grpc_call_destroy_type) GetProcAddress(library, "grpc_call_destroy"); + grpc_call_ref_import = (grpc_call_ref_type) GetProcAddress(library, "grpc_call_ref"); + grpc_call_unref_import = (grpc_call_unref_type) GetProcAddress(library, "grpc_call_unref"); grpc_server_request_call_import = (grpc_server_request_call_type) GetProcAddress(library, "grpc_server_request_call"); grpc_server_register_method_import = (grpc_server_register_method_type) GetProcAddress(library, "grpc_server_register_method"); grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index dcdc8063a2..c33fdd210e 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -275,6 +275,9 @@ extern grpc_channel_register_call_type grpc_channel_register_call_import; typedef grpc_call *(*grpc_channel_create_registered_call_type)(grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *completion_queue, void *registered_call_handle, gpr_timespec deadline, void *reserved); extern grpc_channel_create_registered_call_type grpc_channel_create_registered_call_import; #define grpc_channel_create_registered_call grpc_channel_create_registered_call_import +typedef void *(*grpc_call_arena_alloc_type)(grpc_call *call, size_t size); +extern grpc_call_arena_alloc_type grpc_call_arena_alloc_import; +#define grpc_call_arena_alloc grpc_call_arena_alloc_import typedef grpc_call_error(*grpc_call_start_batch_type)(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved); extern grpc_call_start_batch_type grpc_call_start_batch_import; #define grpc_call_start_batch grpc_call_start_batch_import @@ -308,9 +311,12 @@ extern grpc_call_cancel_type grpc_call_cancel_import; typedef grpc_call_error(*grpc_call_cancel_with_status_type)(grpc_call *call, grpc_status_code status, const char *description, void *reserved); extern grpc_call_cancel_with_status_type grpc_call_cancel_with_status_import; #define grpc_call_cancel_with_status grpc_call_cancel_with_status_import -typedef void(*grpc_call_destroy_type)(grpc_call *call); -extern grpc_call_destroy_type grpc_call_destroy_import; -#define grpc_call_destroy grpc_call_destroy_import +typedef void(*grpc_call_ref_type)(grpc_call *call); +extern grpc_call_ref_type grpc_call_ref_import; +#define grpc_call_ref grpc_call_ref_import +typedef void(*grpc_call_unref_type)(grpc_call *call); +extern grpc_call_unref_type grpc_call_unref_import; +#define grpc_call_unref grpc_call_unref_import typedef grpc_call_error(*grpc_server_request_call_type)(grpc_server *server, grpc_call **call, grpc_call_details *details, grpc_metadata_array *request_metadata, grpc_completion_queue *cq_bound_to_call, grpc_completion_queue *cq_for_notification, void *tag_new); extern grpc_server_request_call_type grpc_server_request_call_import; #define grpc_server_request_call grpc_server_request_call_import diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index ef57d5b07e..d7408f683d 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -132,11 +132,15 @@ static VALUE grpc_rb_server_alloc(VALUE cls) { Initializes server instances. */ static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) { - grpc_completion_queue *cq = grpc_completion_queue_create_for_pluck(NULL); + grpc_completion_queue *cq = NULL; grpc_rb_server *wrapper = NULL; grpc_server *srv = NULL; grpc_channel_args args; MEMZERO(&args, grpc_channel_args, 1); + + grpc_ruby_once_init(); + + cq = grpc_completion_queue_create_for_pluck(NULL); TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, wrapper); grpc_rb_hash_convert_to_channel_args(channel_args, &args); |