diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 186 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 6 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 19 | ||||
-rw-r--r-- | src/cpp/common/core_codegen.cc | 12 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 32 | ||||
-rw-r--r-- | src/node/ext/server_generic.cc | 8 | ||||
-rw-r--r-- | src/proto/grpc/health/v1/BUILD | 39 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi | 3 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi | 11 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.c | 2 | ||||
-rw-r--r-- | src/ruby/ext/grpc/rb_grpc_imports.generated.h | 3 |
11 files changed, 247 insertions, 74 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 35e9f7eb30..eae3f103b1 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -60,13 +60,154 @@ typedef struct { void *tag; } plucker; +typedef struct { + bool can_get_pollset; + bool can_listen; + size_t (*size)(void); + void (*init)(grpc_pollset *pollset, gpr_mu **mu); + grpc_error *(*kick)(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); + grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure); + void (*destroy)(grpc_pollset *pollset); +} cq_poller_vtable; + +typedef struct non_polling_worker { + gpr_cv cv; + bool kicked; + struct non_polling_worker *next; + struct non_polling_worker *prev; +} non_polling_worker; + +typedef struct { + gpr_mu mu; + non_polling_worker *root; + grpc_closure *shutdown; +} non_polling_poller; + +static size_t non_polling_poller_size(void) { + return sizeof(non_polling_poller); +} + +static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) { + non_polling_poller *npp = (non_polling_poller *)pollset; + gpr_mu_init(&npp->mu); + *mu = &npp->mu; +} + +static void non_polling_poller_destroy(grpc_pollset *pollset) { + non_polling_poller *npp = (non_polling_poller *)pollset; + gpr_mu_destroy(&npp->mu); +} + +static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker **worker, + gpr_timespec now, + gpr_timespec deadline) { + non_polling_poller *npp = (non_polling_poller *)pollset; + if (npp->shutdown) return GRPC_ERROR_NONE; + non_polling_worker w; + gpr_cv_init(&w.cv); + if (worker != NULL) *worker = (grpc_pollset_worker *)&w; + if (npp->root == NULL) { + npp->root = w.next = w.prev = &w; + } else { + w.next = npp->root; + w.prev = w.next->prev; + w.next->prev = w.prev->next = &w; + } + w.kicked = false; + while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline)) + ; + if (&w == npp->root) { + npp->root = w.next; + if (&w == npp->root) { + if (npp->shutdown) { + grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE); + } + npp->root = NULL; + } + } + w.next->prev = w.prev; + w.prev->next = w.next; + gpr_cv_destroy(&w.cv); + if (worker != NULL) *worker = NULL; + return GRPC_ERROR_NONE; +} + +static grpc_error *non_polling_poller_kick( + grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + non_polling_poller *p = (non_polling_poller *)pollset; + if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root; + if (specific_worker != NULL) { + non_polling_worker *w = (non_polling_worker *)specific_worker; + if (!w->kicked) { + w->kicked = true; + gpr_cv_signal(&w->cv); + } + } + return GRPC_ERROR_NONE; +} + +static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_closure *closure) { + non_polling_poller *p = (non_polling_poller *)pollset; + GPR_ASSERT(closure != NULL); + p->shutdown = closure; + if (p->root == NULL) { + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + } else { + non_polling_worker *w = p->root; + do { + gpr_cv_signal(&w->cv); + w = w->next; + } while (w != p->root); + } +} + +static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { + /* GRPC_CQ_DEFAULT_POLLING */ + {.can_get_pollset = true, + .can_listen = true, + .size = grpc_pollset_size, + .init = grpc_pollset_init, + .kick = grpc_pollset_kick, + .work = grpc_pollset_work, + .shutdown = grpc_pollset_shutdown, + .destroy = grpc_pollset_destroy}, + /* GRPC_CQ_NON_LISTENING */ + {.can_get_pollset = true, + .can_listen = false, + .size = grpc_pollset_size, + .init = grpc_pollset_init, + .kick = grpc_pollset_kick, + .work = grpc_pollset_work, + .shutdown = grpc_pollset_shutdown, + .destroy = grpc_pollset_destroy}, + /* GRPC_CQ_NON_POLLING */ + {.can_get_pollset = false, + .can_listen = false, + .size = non_polling_poller_size, + .init = non_polling_poller_init, + .kick = non_polling_poller_kick, + .work = non_polling_poller_work, + .shutdown = non_polling_poller_shutdown, + .destroy = non_polling_poller_destroy}, +}; + /* Completion queue structure */ struct grpc_completion_queue { /** owned by pollset */ gpr_mu *mu; grpc_cq_completion_type completion_type; - grpc_cq_polling_type polling_type; + + const cq_poller_vtable *poller_vtable; /** completed events */ grpc_cq_completion completed_head; @@ -127,15 +268,18 @@ grpc_completion_queue *grpc_completion_queue_create_internal( "polling_type=%d)", 2, (completion_type, polling_type)); - cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); - grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); + const cq_poller_vtable *poller_vtable = + &g_poller_vtable_by_poller_type[polling_type]; + + cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); + poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; cc->outstanding_tag_capacity = 0; #endif cc->completion_type = completion_type; - cc->polling_type = polling_type; + cc->poller_vtable = poller_vtable; /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); @@ -164,10 +308,6 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { return cc->completion_type; } -grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) { - return cc->polling_type; -} - #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { @@ -195,7 +335,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_destroy(POLLSET_FROM_CQ(cc)); + cc->poller_vtable->destroy(POLLSET_FROM_CQ(cc)); #ifndef NDEBUG gpr_free(cc->outstanding_tags); #endif @@ -280,7 +420,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, } } grpc_error *kick_error = - grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); gpr_mu_unlock(cc->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -295,8 +435,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); gpr_mu_unlock(cc->mu); } @@ -452,8 +592,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(cc->mu); continue; } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, - now, iteration_deadline); + grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + NULL, now, iteration_deadline); if (err != GRPC_ERROR_NONE) { gpr_mu_unlock(cc->mu); const char *msg = grpc_error_string(err); @@ -644,8 +784,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(cc->mu); } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), - &worker, now, iteration_deadline); + grpc_error *err = cc->poller_vtable->work( + &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); if (err != GRPC_ERROR_NONE) { del_plucker(cc, tag, &worker); gpr_mu_unlock(cc->mu); @@ -689,8 +829,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->pending_events)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); } gpr_mu_unlock(cc->mu); grpc_exec_ctx_finish(&exec_ctx); @@ -706,7 +846,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return POLLSET_FROM_CQ(cc); + return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; } grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { @@ -727,4 +867,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } -int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } +bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { + return cc->is_server_cq; +} + +bool grpc_cq_can_listen(grpc_completion_queue *cc) { + return cc->poller_vtable->can_listen; +} diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 1ff3d64293..a932087939 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -94,13 +94,11 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -int grpc_cq_is_server_cq(grpc_completion_queue *cc); +bool grpc_cq_is_server_cq(grpc_completion_queue *cc); +bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); -grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc); grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index e133d1d2a4..934ca0431a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -981,7 +981,7 @@ const grpc_channel_filter grpc_server_top_filter = { static void register_completion_queue(grpc_server *server, grpc_completion_queue *cq, - bool is_non_listening, void *reserved) { + void *reserved) { size_t i, n; GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { @@ -990,10 +990,6 @@ static void register_completion_queue(grpc_server *server, grpc_cq_mark_server_cq(cq); - if (is_non_listening) { - grpc_cq_mark_non_listening_server_cq(cq); - } - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -1016,16 +1012,7 @@ void grpc_server_register_completion_queue(grpc_server *server, calls grpc_completion_queue_pluck() on server completion queues */ } - register_completion_queue(server, cq, false, reserved); -} - -void grpc_server_register_non_listening_completion_queue( - grpc_server *server, grpc_completion_queue *cq, void *reserved) { - GRPC_API_TRACE( - "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " - "reserved=%p)", - 3, (server, cq, reserved)); - register_completion_queue(server, cq, true, reserved); + register_completion_queue(server, cq, reserved); } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { @@ -1121,7 +1108,7 @@ void grpc_server_start(grpc_server *server) { server->requested_calls_per_cq = gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { - if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { + if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 4f8b826aec..f679a33945 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -54,6 +54,18 @@ struct grpc_byte_buffer; namespace grpc { +const grpc_completion_queue_factory* +CoreCodegen::grpc_completion_queue_factory_lookup( + const grpc_completion_queue_attributes* attributes) { + return ::grpc_completion_queue_factory_lookup(attributes); +} + +grpc_completion_queue* CoreCodegen::grpc_completion_queue_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attributes, void* reserved) { + return ::grpc_completion_queue_create(factory, attributes, reserved); +} + grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next( void* reserved) { return ::grpc_completion_queue_create_for_next(reserved); diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index 3a15afc49e..2ead048a1f 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -83,7 +83,8 @@ ServerBuilder::~ServerBuilder() { std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue( bool is_frequently_polled) { - ServerCompletionQueue* cq = new ServerCompletionQueue(is_frequently_polled); + ServerCompletionQueue* cq = new ServerCompletionQueue( + is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING); cqs_.push_back(cq); return std::unique_ptr<ServerCompletionQueue>(cq); } @@ -242,6 +243,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { sync_server_cqs(std::make_shared< std::vector<std::unique_ptr<ServerCompletionQueue>>>()); + int num_frequently_polled_cqs = 0; + for (auto it = cqs_.begin(); it != cqs_.end(); ++it) { + if ((*it)->IsFrequentlyPolled()) { + num_frequently_polled_cqs++; + } + } + + const bool is_hybrid_server = + has_sync_methods && num_frequently_polled_cqs > 0; + if (has_sync_methods) { // This is a Sync server gpr_log(GPR_INFO, @@ -251,9 +262,12 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec); + grpc_cq_polling_type polling_type = + is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING; + // Create completion queues to listen to incoming rpc requests for (int i = 0; i < sync_server_settings_.num_cqs; i++) { - sync_server_cqs->emplace_back(new ServerCompletionQueue()); + sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type)); } } @@ -269,12 +283,10 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { // server // 2. cqs_: Completion queues added via AddCompletionQueue() call - // All sync cqs (if any) are frequently polled by ThreadManager - int num_frequently_polled_cqs = sync_server_cqs->size(); - for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) { grpc_server_register_completion_queue(server->server_, (*it)->cq(), nullptr); + num_frequently_polled_cqs++; } // cqs_ contains the completion queue added by calling the ServerBuilder's @@ -283,14 +295,8 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { // listening to incoming channels. Such completion queues must be registered // as non-listening queues for (auto it = cqs_.begin(); it != cqs_.end(); ++it) { - if ((*it)->IsFrequentlyPolled()) { - grpc_server_register_completion_queue(server->server_, (*it)->cq(), - nullptr); - num_frequently_polled_cqs++; - } else { - grpc_server_register_non_listening_completion_queue(server->server_, - (*it)->cq(), nullptr); - } + grpc_server_register_completion_queue(server->server_, (*it)->cq(), + nullptr); } if (num_frequently_polled_cqs == 0) { diff --git a/src/node/ext/server_generic.cc b/src/node/ext/server_generic.cc index 24573bd52f..088273d527 100644 --- a/src/node/ext/server_generic.cc +++ b/src/node/ext/server_generic.cc @@ -44,9 +44,11 @@ namespace grpc { namespace node { Server::Server(grpc_server *server) : wrapped_server(server) { - shutdown_queue = grpc_completion_queue_create_for_pluck(NULL); - grpc_server_register_non_listening_completion_queue(server, shutdown_queue, - NULL); + grpc_completion_queue_attributes attrs = { + GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_NON_LISTENING}; + shutdown_queue = grpc_completion_queue_create( + grpc_completion_queue_factory_lookup(&attrs), &attrs, NULL); + grpc_server_register_completion_queue(server, shutdown_queue, NULL); } Server::~Server() { diff --git a/src/proto/grpc/health/v1/BUILD b/src/proto/grpc/health/v1/BUILD new file mode 100644 index 0000000000..dbb91d9139 --- /dev/null +++ b/src/proto/grpc/health/v1/BUILD @@ -0,0 +1,39 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +licenses(["notice"]) # 3-clause BSD + +package(default_visibility = ["//visibility:public"]) + +load("//bazel:grpc_build_system.bzl", "grpc_proto_library") + +grpc_proto_library( + name = "health_proto", + srcs = ["health.proto"], +) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 4f1776e002..f66f6e4122 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -356,8 +356,6 @@ cdef extern from "grpc/grpc.h": void grpc_server_register_completion_queue(grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil - void grpc_server_register_non_listening_completion_queue( - grpc_server *server, grpc_completion_queue *cq, void *reserved) nogil int grpc_server_add_insecure_http2_port( grpc_server *server, const char *addr) nogil void grpc_server_start(grpc_server *server) nogil @@ -502,4 +500,3 @@ cdef extern from "grpc/compression.h": int grpc_compression_options_is_algorithm_enabled( const grpc_compression_options *opts, grpc_compression_algorithm algorithm) nogil - diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi index 18db38b686..97192efda7 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi @@ -82,20 +82,11 @@ cdef class Server: self.c_server, queue.c_completion_queue, NULL) self.registered_completion_queues.append(queue) - def register_non_listening_completion_queue( - self, CompletionQueue queue not None): - if self.is_started: - raise ValueError("cannot register completion queues after start") - with nogil: - grpc_server_register_non_listening_completion_queue( - self.c_server, queue.c_completion_queue, NULL) - self.registered_completion_queues.append(queue) - def start(self): if self.is_started: raise ValueError("the server has already started") self.backup_shutdown_queue = CompletionQueue() - self.register_non_listening_completion_queue(self.backup_shutdown_queue) + self.register_completion_queue(self.backup_shutdown_queue) self.is_started = True with nogil: grpc_server_start(self.c_server) diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c index 1c68b2dc7c..a412277f6d 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c @@ -127,7 +127,6 @@ grpc_server_register_method_type grpc_server_register_method_import; grpc_server_request_registered_call_type grpc_server_request_registered_call_import; grpc_server_create_type grpc_server_create_import; grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; -grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; grpc_server_start_type grpc_server_start_import; grpc_server_shutdown_and_notify_type grpc_server_shutdown_and_notify_import; @@ -425,7 +424,6 @@ void grpc_rb_load_imports(HMODULE library) { grpc_server_request_registered_call_import = (grpc_server_request_registered_call_type) GetProcAddress(library, "grpc_server_request_registered_call"); grpc_server_create_import = (grpc_server_create_type) GetProcAddress(library, "grpc_server_create"); grpc_server_register_completion_queue_import = (grpc_server_register_completion_queue_type) GetProcAddress(library, "grpc_server_register_completion_queue"); - grpc_server_register_non_listening_completion_queue_import = (grpc_server_register_non_listening_completion_queue_type) GetProcAddress(library, "grpc_server_register_non_listening_completion_queue"); grpc_server_add_insecure_http2_port_import = (grpc_server_add_insecure_http2_port_type) GetProcAddress(library, "grpc_server_add_insecure_http2_port"); grpc_server_start_import = (grpc_server_start_type) GetProcAddress(library, "grpc_server_start"); grpc_server_shutdown_and_notify_import = (grpc_server_shutdown_and_notify_type) GetProcAddress(library, "grpc_server_shutdown_and_notify"); diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index c33fdd210e..7df8dd69fc 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -332,9 +332,6 @@ extern grpc_server_create_type grpc_server_create_import; typedef void(*grpc_server_register_completion_queue_type)(grpc_server *server, grpc_completion_queue *cq, void *reserved); extern grpc_server_register_completion_queue_type grpc_server_register_completion_queue_import; #define grpc_server_register_completion_queue grpc_server_register_completion_queue_import -typedef void(*grpc_server_register_non_listening_completion_queue_type)(grpc_server *server, grpc_completion_queue *q, void *reserved); -extern grpc_server_register_non_listening_completion_queue_type grpc_server_register_non_listening_completion_queue_import; -#define grpc_server_register_non_listening_completion_queue grpc_server_register_non_listening_completion_queue_import typedef int(*grpc_server_add_insecure_http2_port_type)(grpc_server *server, const char *addr); extern grpc_server_add_insecure_http2_port_type grpc_server_add_insecure_http2_port_import; #define grpc_server_add_insecure_http2_port grpc_server_add_insecure_http2_port_import |