diff options
author | Sree Kuchibhotla <sreecha@users.noreply.github.com> | 2017-04-14 07:59:18 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-14 07:59:18 -0700 |
commit | b81fb794a397b053df0d4bed7b1525a0ff51535f (patch) | |
tree | 45a4df4c665d7de3e04363af5ec6176427abc8f8 /src/core | |
parent | 90bb7bddd138bac7126b01bfcb31fa9beff7d2fb (diff) | |
parent | 0e5fa9ebcd69746c2f27339a7cc13c3da2cde012 (diff) |
Merge pull request #9972 from sreecha/cq_create_api_changes
Completion Queue creation API changes (C-Core and C++ only)
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 48 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 6 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue_factory.c | 33 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 9 | ||||
-rw-r--r-- | src/core/lib/surface/version.c | 2 |
5 files changed, 83 insertions, 15 deletions
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 3273addf1d..35e9f7eb30 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -64,6 +64,10 @@ typedef struct { struct grpc_completion_queue { /** owned by pollset */ gpr_mu *mu; + + grpc_cq_completion_type completion_type; + grpc_cq_polling_type polling_type; + /** completed events */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; @@ -79,6 +83,7 @@ struct grpc_completion_queue { int shutdown_called; int is_server_cq; /** Can the server cq accept incoming channels */ + /* TODO: sreek - This will no longer be needed. Use polling_type set */ int is_non_listening_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; @@ -110,13 +115,17 @@ int grpc_cq_event_timeout_trace; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, grpc_error *error); -grpc_completion_queue *grpc_completion_queue_create(void *reserved) { +grpc_completion_queue *grpc_completion_queue_create_internal( + grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type) { grpc_completion_queue *cc; - GPR_ASSERT(!reserved); - GPR_TIMER_BEGIN("grpc_completion_queue_create", 0); + GPR_TIMER_BEGIN("grpc_completion_queue_create_internal", 0); - GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved)); + GRPC_API_TRACE( + "grpc_completion_queue_create_internal(completion_type=%d, " + "polling_type=%d)", + 2, (completion_type, polling_type)); cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); @@ -125,6 +134,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { cc->outstanding_tag_capacity = 0; #endif + cc->completion_type = completion_type; + cc->polling_type = polling_type; + /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ @@ -143,11 +155,19 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); - GPR_TIMER_END("grpc_completion_queue_create", 0); + GPR_TIMER_END("grpc_completion_queue_create_internal", 0); return cc; } +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { + return cc->completion_type; +} + +grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) { + return cc->polling_type; +} + #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { @@ -347,6 +367,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_event ret; gpr_timespec now; + if (cc->completion_type != GRPC_CQ_NEXT) { + gpr_log(GPR_ERROR, + "grpc_completion_queue_next() cannot be called on this completion " + "queue since its completion type is not GRPC_CQ_NEXT"); + abort(); + } + GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GRPC_API_TRACE( @@ -516,6 +543,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); + if (cc->completion_type != GRPC_CQ_PLUCK) { + gpr_log(GPR_ERROR, + "grpc_completion_queue_pluck() cannot be called on this completion " + "queue since its completion type is not GRPC_CQ_PLUCK"); + abort(); + } + if (grpc_cq_pluck_trace) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" @@ -680,10 +714,14 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { } void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { + /* TODO: sreek - use cc->polling_type field here and add a validation check + (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose + polling_type is set to GRPC_CQ_NON_LISTENING */ cc->is_non_listening_server_cq = 1; } bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { + /* TODO (sreek) - return (cc->polling_type == GRPC_CQ_NON_LISTENING) */ return (cc->is_non_listening_server_cq == 1); } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 5d73dd7216..1ff3d64293 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -99,4 +99,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); +grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc); + +grpc_completion_queue *grpc_completion_queue_create_internal( + grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type); + #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/completion_queue_factory.c b/src/core/lib/surface/completion_queue_factory.c index db67a5192b..d68b84eddd 100644 --- a/src/core/lib/surface/completion_queue_factory.c +++ b/src/core/lib/surface/completion_queue_factory.c @@ -36,12 +36,15 @@ #include <grpc/support/log.h> -/* TODO (sreek) - Currently this does not use the attributes arg. This will be - added in a future PR */ +/* + * == Default completion queue factory implementation == + */ + static grpc_completion_queue* default_create( const grpc_completion_queue_factory* factory, - const grpc_completion_queue_attributes* attributes) { - return grpc_completion_queue_create(NULL); + const grpc_completion_queue_attributes* attr) { + return grpc_completion_queue_create_internal(attr->cq_completion_type, + attr->cq_polling_type); } static grpc_completion_queue_factory_vtable default_vtable = {default_create}; @@ -49,19 +52,24 @@ static grpc_completion_queue_factory_vtable default_vtable = {default_create}; static const grpc_completion_queue_factory g_default_cq_factory = { "Default Factory", NULL, &default_vtable}; +/* + * == Completion queue factory APIs + */ + const grpc_completion_queue_factory* grpc_completion_queue_factory_lookup( const grpc_completion_queue_attributes* attributes) { - /* As we add more fields to grpc_completion_queue_attributes, we may have to - change this assert to: - GPR_ASSERT (attributes->version >= 1 && - attributes->version <= GRPC_CQ_CURRENT_VERSION) */ - GPR_ASSERT(attributes->version == 1); + GPR_ASSERT(attributes->version >= 1 && + attributes->version <= GRPC_CQ_CURRENT_VERSION); /* The default factory can handle version 1 of the attributes structure. We may have to change this as more fields are added to the structure */ return &g_default_cq_factory; } +/* + * == Completion queue creation APIs == + */ + grpc_completion_queue* grpc_completion_queue_create_for_next(void* reserved) { GPR_ASSERT(!reserved); grpc_completion_queue_attributes attr = {1, GRPC_CQ_NEXT, @@ -75,3 +83,10 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { GRPC_CQ_DEFAULT_POLLING}; return g_default_cq_factory.vtable->create(&g_default_cq_factory, &attr); } + +grpc_completion_queue* grpc_completion_queue_create( + const grpc_completion_queue_factory* factory, + const grpc_completion_queue_attributes* attr, void* reserved) { + GPR_ASSERT(!reserved); + return factory->vtable->create(factory, attr); +} diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 191ee75252..9496f90390 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1000,6 +1000,15 @@ void grpc_server_register_completion_queue(grpc_server *server, GRPC_API_TRACE( "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, (server, cq, reserved)); + + if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) { + gpr_log(GPR_INFO, + "Completion queue which is not of type GRPC_CQ_NEXT is being " + "registered as a server-completion-queue"); + /* Ideally we should log an error and abort but ruby-wrapped-language API + calls grpc_completion_queue_pluck() on server completion queues */ + } + register_completion_queue(server, cq, false, reserved); } diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index ba80bd801e..3793845559 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -36,6 +36,6 @@ #include <grpc/grpc.h> -const char *grpc_version_string(void) { return "3.0.0-dev"; } +const char *grpc_version_string(void) { return "4.0.0-dev"; } const char *grpc_g_stands_for(void) { return "gentle"; } |