diff options
115 files changed, 825 insertions, 373 deletions
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h index 201e52ae07..bf18a7d644 100644 --- a/include/grpc++/impl/codegen/client_unary_call.h +++ b/include/grpc++/impl/codegen/client_unary_call.h @@ -52,7 +52,7 @@ template <class InputMessage, class OutputMessage> Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const InputMessage& request, OutputMessage* result) { - CompletionQueue cq; + CompletionQueue cq(GRPC_CQ_PLUCK, DEFAULT_POLLING); Call call(channel->CreateCall(method, context, &cq)); CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>, diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h index 03cecdc21c..a123af2cf6 100644 --- a/include/grpc++/impl/codegen/completion_queue.h +++ b/include/grpc++/impl/codegen/completion_queue.h @@ -52,6 +52,7 @@ #include <grpc++/impl/codegen/grpc_library.h> #include <grpc++/impl/codegen/status.h> #include <grpc++/impl/codegen/time.h> +#include <grpc/grpc.h> #include <grpc/impl/codegen/atm.h> struct grpc_completion_queue; @@ -102,10 +103,7 @@ class CompletionQueue : private GrpcLibraryCodegen { public: /// Default constructor. Implicitly creates a \a grpc_completion_queue /// instance. - CompletionQueue() { - cq_ = g_core_codegen_interface->grpc_completion_queue_create(nullptr); - InitialAvalanching(); // reserve this for the future shutdown - } + CompletionQueue() : CompletionQueue(GRPC_CQ_NEXT, DEFAULT_POLLING) {} /// Wrap \a take, taking ownership of the instance. /// @@ -149,8 +147,9 @@ class CompletionQueue : private GrpcLibraryCodegen { /// /// \return true if read a regular event, false if the queue is shutting down. bool Next(void** tag, bool* ok) { - return (AsyncNextInternal(tag, ok, g_core_codegen_interface->gpr_inf_future( - GPR_CLOCK_REALTIME)) != SHUTDOWN); + return (AsyncNextInternal(tag, ok, + g_core_codegen_interface->gpr_inf_future( + GPR_CLOCK_REALTIME)) != SHUTDOWN); } /// Request the shutdown of the queue. @@ -218,6 +217,14 @@ class CompletionQueue : private GrpcLibraryCodegen { const InputMessage& request, OutputMessage* result); + /// Private constructor of CompletionQueue only visible to friend classes + CompletionQueue(grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type) { + cq_ = g_core_codegen_interface->grpc_completion_queue_create( + completion_type, polling_type, nullptr); + InitialAvalanching(); // reserve this for the future shutdown + } + NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline); /// Wraps \a grpc_completion_queue_pluck. diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h index 754bf14b25..4c8567f770 100644 --- a/include/grpc++/impl/codegen/core_codegen.h +++ b/include/grpc++/impl/codegen/core_codegen.h @@ -38,6 +38,7 @@ #include <grpc++/impl/codegen/core_codegen_interface.h> #include <grpc/byte_buffer.h> +#include <grpc/grpc.h> #include <grpc/impl/codegen/grpc_types.h> namespace grpc { @@ -45,7 +46,9 @@ namespace grpc { /// Implementation of the core codegen interface. class CoreCodegen : public CoreCodegenInterface { private: - grpc_completion_queue* grpc_completion_queue_create(void* reserved) override; + grpc_completion_queue* grpc_completion_queue_create( + grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type, void* reserved) override; void grpc_completion_queue_destroy(grpc_completion_queue* cq) override; grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h index 45ea040303..8f2b4a3b76 100644 --- a/include/grpc++/impl/codegen/core_codegen_interface.h +++ b/include/grpc++/impl/codegen/core_codegen_interface.h @@ -36,6 +36,7 @@ #include <grpc++/impl/codegen/config.h> #include <grpc++/impl/codegen/status.h> +#include <grpc/grpc.h> #include <grpc/impl/codegen/byte_buffer_reader.h> #include <grpc/impl/codegen/grpc_types.h> #include <grpc/impl/codegen/sync.h> @@ -60,7 +61,8 @@ class CoreCodegenInterface { int line) = 0; virtual grpc_completion_queue* grpc_completion_queue_create( - void* reserved) = 0; + grpc_cq_completion_type completion_type, + grpc_cq_polling_type polling_type, void* reserved) = 0; virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0; virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag, diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h index 4d9b074e95..d3a6c7ff89 100644 --- a/include/grpc++/impl/codegen/sync_stream.h +++ b/include/grpc++/impl/codegen/sync_stream.h @@ -137,7 +137,9 @@ class ClientReader final : public ClientReaderInterface<R> { template <class W> ClientReader(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, const W& request) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(GRPC_CQ_PLUCK, DEFAULT_POLLING), + call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose> ops; @@ -209,7 +211,9 @@ class ClientWriter : public ClientWriterInterface<W> { template <class R> ClientWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context, R* response) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(GRPC_CQ_PLUCK, DEFAULT_POLLING), + call_(channel->CreateCall(method, context, &cq_)) { finish_ops_.RecvMessage(response); finish_ops_.AllowNoMessage(); @@ -292,7 +296,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { /// Blocking create a stream. ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method, ClientContext* context) - : context_(context), call_(channel->CreateCall(method, context, &cq_)) { + : context_(context), + cq_(GRPC_CQ_PLUCK, DEFAULT_POLLING), + call_(channel->CreateCall(method, context, &cq_)) { CallOpSet<CallOpSendInitialMetadata> ops; ops.SendInitialMetadata(context->send_initial_metadata_, context->initial_metadata_flags()); diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 1b33d48c02..0e4711fa1f 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -93,8 +93,44 @@ GRPCAPI const char *grpc_version_string(void); /** Return a string specifying what the 'g' in gRPC stands for */ GRPCAPI const char *grpc_g_stands_for(void); +/** Specifies the type of APIs to use to pop events from the completion queue */ +typedef enum { + /** Events are popped out by calling grpc_completion_queue_next() API ONLY */ + GRPC_CQ_NEXT = 1, + /** Events are popped out by calling grpc_completion_queue_pluck() API ONLY*/ + GRPC_CQ_PLUCK +} grpc_cq_completion_type; + +/** Completion queues internally MAY maintain a set of file descriptors in a + structure called 'pollset'. This enum specifies if a completion queue has an + associated pollset and any restrictions on the type of file descriptors that + can be present in the pollset. + + I/O progress can only be made when grpc_completion_queue_next() or + grpc_completion_queue_pluck() are called on the completion queue (unless the + grpc_cq_polling_type is NON_POLLING) and hence it is very important to + actively call these APIs */ +typedef enum { + /** The completion queue will have an associated pollset and there is no + restriction on the type of file descriptors the pollset may contain */ + DEFAULT_POLLING, + + /** Similar to DEFAULT_POLLING except that the completion queues will not + contain any 'listening file descriptors' (i.e file descriptors used to + listen to incoming channels */ + NON_LISTENING, + + /** The completion queue will not have an associated pollset. Note that + grpc_completion_queue_next() or grpc_completion_queue_pluck() MUST still + be called to pop events from the completion queue; it is not required to + call them actively to make I/O progress */ + NON_POLLING +} grpc_cq_polling_type; + /** Create a completion queue */ -GRPCAPI grpc_completion_queue *grpc_completion_queue_create(void *reserved); +GRPCAPI grpc_completion_queue *grpc_completion_queue_create( + grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, + void *reserved); /** Blocks until an event is available, the completion queue is being shut down, or deadline is reached. diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index e5c731304c..061f59213c 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -352,8 +352,11 @@ typedef enum grpc_completion_type { typedef struct grpc_event { /** The type of the completion. */ grpc_completion_type type; - /** non-zero if the operation was successful, 0 upon failure. - Only GRPC_OP_COMPLETE can succeed or fail. */ + /** If the grpc_completion_type is GRPC_OP_COMPLETE, this field indicates + whether the operation was successful or not; 0 in case of failure and + non-zero in case of success. + If grpc_completion_type is GRPC_QUEUE_SHUTDOWN or GRPC_QUEUE_TIMEOUT, this + field is guaranteed to be 0 */ int success; /** The tag passed to grpc_call_start_batch etc to start this operation. Only GRPC_OP_COMPLETE has a tag. */ diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b4594817e4..d45e265d5c 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -64,6 +64,10 @@ typedef struct { struct grpc_completion_queue { /** owned by pollset */ gpr_mu *mu; + + grpc_cq_completion_type completion_type; + grpc_cq_polling_type polling_type; + /** completed events */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; @@ -79,6 +83,7 @@ struct grpc_completion_queue { int shutdown_called; int is_server_cq; /** Can the server cq accept incoming channels */ + /* TODO: sreek - This will no longer be needed. Use polling_type set */ int is_non_listening_server_cq; int num_pluckers; plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS]; @@ -110,7 +115,9 @@ int grpc_cq_event_timeout_trace; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, grpc_error *error); -grpc_completion_queue *grpc_completion_queue_create(void *reserved) { +grpc_completion_queue *grpc_completion_queue_create( + grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, + void *reserved) { grpc_completion_queue *cc; GPR_ASSERT(!reserved); @@ -125,6 +132,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { cc->outstanding_tag_capacity = 0; #endif + cc->completion_type = completion_type; + cc->polling_type = polling_type; + /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); /* One for destroy(), one for pollset_shutdown */ @@ -148,6 +158,14 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { return cc; } +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { + return cc->completion_type; +} + +grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) { + return cc->polling_type; +} + #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { @@ -348,6 +366,13 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, grpc_pollset_worker *worker = NULL; gpr_timespec now; + if (cc->completion_type != GRPC_CQ_NEXT) { + gpr_log(GPR_ERROR, + "grpc_completion_queue_next() cannot be called on this completion " + "queue since its completion type is not GRPC_CQ_NEXT"); + abort(); + } + GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); GRPC_API_TRACE( @@ -356,8 +381,9 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 5, (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, - reserved)); + 5, + (cc, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, + reserved)); GPR_ASSERT(!reserved); dump_pending_tags(cc); @@ -517,6 +543,13 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0); + if (cc->completion_type != GRPC_CQ_PLUCK) { + gpr_log(GPR_ERROR, + "grpc_completion_queue_pluck() cannot be called on this completion " + "queue since its completion type is not GRPC_CQ_PLUCK"); + abort(); + } + if (grpc_cq_pluck_trace) { GRPC_API_TRACE( "grpc_completion_queue_pluck(" @@ -524,8 +557,9 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "deadline=gpr_timespec { tv_sec: %" PRId64 ", tv_nsec: %d, clock_type: %d }, " "reserved=%p)", - 6, (cc, tag, deadline.tv_sec, deadline.tv_nsec, - (int)deadline.clock_type, reserved)); + 6, + (cc, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type, + reserved)); } GPR_ASSERT(!reserved); @@ -681,10 +715,14 @@ grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { } void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc) { + /* TODO: sreek - use cc->polling_type field here and add a validation check + (i.e grpc_cq_mark_non_listening_server_cq can only be called on a cc whose + polling_type is set to NON_LISTENING */ cc->is_non_listening_server_cq = 1; } bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { + /* TODO (sreek) - return (cc->polling_type == NON_LISTENING) */ return (cc->is_non_listening_server_cq == 1); } diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 5d73dd7216..21f28e9424 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -99,4 +99,7 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); int grpc_cq_is_server_cq(grpc_completion_queue *cc); +grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); +grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc); + #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index b360579553..13f7321fac 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1001,6 +1001,14 @@ void grpc_server_register_completion_queue(grpc_server *server, GRPC_API_TRACE( "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3, (server, cq, reserved)); + + if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) { + gpr_log( + GPR_ERROR, + "Server completion queues must have a completion type of GRPC_CQ_NEXT"); + abort(); + } + register_completion_queue(server, cq, false, reserved); } @@ -1423,8 +1431,9 @@ grpc_call_error grpc_server_request_call( "grpc_server_request_call(" "server=%p, call=%p, details=%p, initial_metadata=%p, " "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)", - 7, (server, call, details, initial_metadata, cq_bound_to_call, - cq_for_notification, tag)); + 7, + (server, call, details, initial_metadata, cq_bound_to_call, + cq_for_notification, tag)); size_t cq_idx; for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { if (server->cqs[cq_idx] == cq_for_notification) { @@ -1466,8 +1475,9 @@ grpc_call_error grpc_server_request_registered_call( "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, " "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, " "tag=%p)", - 9, (server, rmp, call, deadline, initial_metadata, optional_payload, - cq_bound_to_call, cq_for_notification, tag)); + 9, + (server, rmp, call, deadline, initial_metadata, optional_payload, + cq_bound_to_call, cq_for_notification, tag)); size_t cq_idx; for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) { diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index 36e4c89354..81b32938b8 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -55,8 +55,10 @@ struct grpc_byte_buffer; namespace grpc { grpc_completion_queue* CoreCodegen::grpc_completion_queue_create( + grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, void* reserved) { - return ::grpc_completion_queue_create(reserved); + return ::grpc_completion_queue_create(completion_type, polling_type, + reserved); } void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) { diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc index 9e11a8a9e0..1b7ca82be0 100644 --- a/src/cpp/server/server_cc.cc +++ b/src/cpp/server/server_cc.cc @@ -124,6 +124,14 @@ class ShutdownTag : public CompletionQueueTag { bool FinalizeResult(void** tag, bool* status) { return false; } }; +class DummyTag : public CompletionQueueTag { + public: + bool FinalizeResult(void** tag, bool* status) { + *status = true; + return true; + } +}; + class Server::SyncRequest final : public CompletionQueueTag { public: SyncRequest(RpcServiceMethod* method, void* tag) @@ -145,7 +153,10 @@ class Server::SyncRequest final : public CompletionQueueTag { grpc_metadata_array_destroy(&request_metadata_); } - void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); } + void SetupRequest() { + // TODO: sreek - Double check if this should be GRPC_CQ_PLUCK + cq_ = grpc_completion_queue_create(GRPC_CQ_PLUCK, DEFAULT_POLLING, nullptr); + } void TeardownRequest() { grpc_completion_queue_destroy(cq_); @@ -213,10 +224,10 @@ class Server::SyncRequest final : public CompletionQueueTag { MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_)); global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; - void* ignored_tag; - bool ignored_ok; + DummyTag ignored_tag; cq_.Shutdown(); - GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false); + /* Ensure the cq_ is shutdown (else this will hang indefinitely) */ + GPR_ASSERT(cq_.Pluck(&ignored_tag) == false); } private: diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index b16e673878..8ec7f6fd95 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -224,7 +224,7 @@ extern grpc_version_string_type grpc_version_string_import; typedef const char *(*grpc_g_stands_for_type)(void); extern grpc_g_stands_for_type grpc_g_stands_for_import; #define grpc_g_stands_for grpc_g_stands_for_import -typedef grpc_completion_queue *(*grpc_completion_queue_create_type)(void *reserved); +typedef grpc_completion_queue *(*grpc_completion_queue_create_type)(grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, void *reserved); extern grpc_completion_queue_create_type grpc_completion_queue_create_import; #define grpc_completion_queue_create grpc_completion_queue_create_import typedef grpc_event(*grpc_completion_queue_next_type)(grpc_completion_queue *cq, gpr_timespec deadline, void *reserved); diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index fdedfe284e..a6e859d06e 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -104,6 +104,7 @@ void grpc_run_bad_client_test( grpc_slice_buffer outgoing; grpc_closure done_write_closure; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_completion_queue *shutdown_cq; hex = gpr_dump(client_payload, client_payload_length, GPR_DUMP_HEX | GPR_DUMP_ASCII); @@ -124,7 +125,7 @@ void grpc_run_bad_client_test( /* Create server, completion events */ a.server = grpc_server_create(NULL, NULL); - a.cq = grpc_completion_queue_create(NULL); + a.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); gpr_event_init(&a.done_thd); gpr_event_init(&a.done_write); a.validator = server_validator; @@ -195,10 +196,13 @@ void grpc_run_bad_client_test( grpc_endpoint_destroy(&exec_ctx, sfd.client); grpc_exec_ctx_finish(&exec_ctx); } - grpc_server_shutdown_and_notify(a.server, a.cq, NULL); + + shutdown_cq = grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); + grpc_server_shutdown_and_notify(a.server, shutdown_cq, NULL); GPR_ASSERT(grpc_completion_queue_pluck( - a.cq, NULL, grpc_timeout_seconds_to_deadline(1), NULL) + shutdown_cq, NULL, grpc_timeout_seconds_to_deadline(1), NULL) .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); grpc_server_destroy(a.server); grpc_completion_queue_destroy(a.cq); grpc_slice_buffer_destroy_internal(&exec_ctx, &outgoing); diff --git a/test/core/bad_ssl/bad_ssl_test.c b/test/core/bad_ssl/bad_ssl_test.c index bd85585706..4d34f8a9ea 100644 --- a/test/core/bad_ssl/bad_ssl_test.c +++ b/test/core/bad_ssl/bad_ssl_test.c @@ -61,7 +61,8 @@ static void run_test(const char *target, size_t nops) { grpc_status_code status; grpc_call_error error; gpr_timespec deadline = grpc_timeout_seconds_to_deadline(5); - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); cq_verifier *cqv = cq_verifier_create(cq); grpc_op ops[6]; diff --git a/test/core/bad_ssl/server_common.c b/test/core/bad_ssl/server_common.c index 6a4313eafd..383899d94a 100644 --- a/test/core/bad_ssl/server_common.c +++ b/test/core/bad_ssl/server_common.c @@ -66,7 +66,10 @@ void bad_ssl_run(grpc_server *server) { grpc_call *s = NULL; grpc_call_details call_details; grpc_metadata_array request_metadata_recv; - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + grpc_completion_queue *shutdown_cq; grpc_call_details_init(&call_details); grpc_metadata_array_init(&request_metadata_recv); @@ -82,16 +85,21 @@ void bad_ssl_run(grpc_server *server) { while (!shutdown_finished) { if (got_sigint && !shutdown_started) { gpr_log(GPR_INFO, "Shutting down due to SIGINT"); - grpc_server_shutdown_and_notify(server, cq, NULL); - GPR_ASSERT(grpc_completion_queue_pluck( - cq, NULL, grpc_timeout_seconds_to_deadline(5), NULL) - .type == GRPC_OP_COMPLETE); + shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); + grpc_server_shutdown_and_notify(server, shutdown_cq, NULL); + GPR_ASSERT( + grpc_completion_queue_pluck(shutdown_cq, NULL, + grpc_timeout_seconds_to_deadline(5), NULL) + .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); grpc_completion_queue_shutdown(cq); shutdown_started = 1; } ev = grpc_completion_queue_next( - cq, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000000, GPR_TIMESPAN)), + cq, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000000, GPR_TIMESPAN)), NULL); switch (ev.type) { case GRPC_OP_COMPLETE: diff --git a/test/core/client_channel/lb_policies_test.c b/test/core/client_channel/lb_policies_test.c index 057b90ec84..9dcbdb979d 100644 --- a/test/core/client_channel/lb_policies_test.c +++ b/test/core/client_channel/lb_policies_test.c @@ -59,14 +59,15 @@ typedef struct servers_fixture { grpc_server **servers; grpc_call **server_calls; grpc_completion_queue *cq; + grpc_completion_queue *shutdown_cq; char **servers_hostports; grpc_metadata_array *request_metadata_recv; } servers_fixture; typedef struct request_sequences { - size_t n; /* number of iterations */ - int *connections; /* indexed by the interation number, value is the index of - the server it connected to or -1 if none */ + size_t n; /* number of iterations */ + int *connections; /* indexed by the interation number, value is the index of + the server it connected to or -1 if none */ int *connectivity_states; /* indexed by the interation number, value is the client connectivity state */ } request_sequences; @@ -146,10 +147,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void kill_server(const servers_fixture *f, size_t i) { gpr_log(GPR_INFO, "KILLING SERVER %" PRIuPTR, i); GPR_ASSERT(f->servers[i] != NULL); - grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); - GPR_ASSERT( - grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL) - .type == GRPC_OP_COMPLETE); + grpc_server_shutdown_and_notify(f->servers[i], f->shutdown_cq, tag(10000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(10000), + n_millis_time(5000), NULL) + .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); f->servers[i] = NULL; } @@ -196,7 +197,9 @@ static servers_fixture *setup_servers(const char *server_host, /* Create servers. */ f->servers = gpr_malloc(sizeof(grpc_server *) * num_servers); f->servers_hostports = gpr_malloc(sizeof(char *) * num_servers); - f->cq = grpc_completion_queue_create(NULL); + f->cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f->shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); for (i = 0; i < num_servers; i++) { grpc_metadata_array_init(&f->request_metadata_recv[i]); gpr_join_host_port(&f->servers_hostports[i], server_host, @@ -212,8 +215,8 @@ static void teardown_servers(servers_fixture *f) { /* Destroy server. */ for (i = 0; i < f->num_servers; i++) { if (f->servers[i] == NULL) continue; - grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000)); - GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000), + grpc_server_shutdown_and_notify(f->servers[i], f->shutdown_cq, tag(10000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(10000), n_millis_time(5000), NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->servers[i]); @@ -221,6 +224,7 @@ static void teardown_servers(servers_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); gpr_free(f->servers); diff --git a/test/core/client_channel/set_initial_connect_string_test.c b/test/core/client_channel/set_initial_connect_string_test.c index a0a33667cc..38f2fbac62 100644 --- a/test/core/client_channel/set_initial_connect_string_test.c +++ b/test/core/client_channel/set_initial_connect_string_test.c @@ -130,7 +130,7 @@ static gpr_timespec n_sec_deadline(int seconds) { } static void start_rpc(int use_creds, int target_port) { - state.cq = grpc_completion_queue_create(NULL); + state.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); if (use_creds) { state.creds = grpc_fake_transport_security_credentials_create(); } else { diff --git a/test/core/end2end/bad_server_response_test.c b/test/core/end2end/bad_server_response_test.c index 39a98e84ca..67f5bdbac8 100644 --- a/test/core/end2end/bad_server_response_test.c +++ b/test/core/end2end/bad_server_response_test.c @@ -178,7 +178,7 @@ static void start_rpc(int target_port, grpc_status_code expected_status, cq_verifier *cqv; grpc_slice details; - state.cq = grpc_completion_queue_create(NULL); + state.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); cqv = cq_verifier_create(state.cq); gpr_join_host_port(&state.target, "127.0.0.1", target_port); state.channel = grpc_insecure_channel_create(state.target, NULL, NULL); diff --git a/test/core/end2end/connection_refused_test.c b/test/core/end2end/connection_refused_test.c index 16a3005539..617828f954 100644 --- a/test/core/end2end/connection_refused_test.c +++ b/test/core/end2end/connection_refused_test.c @@ -69,7 +69,7 @@ static void run_test(bool wait_for_ready, bool use_service_config) { grpc_metadata_array_init(&trailing_metadata_recv); - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); cqv = cq_verifier_create(cq); /* if using service config, create channel args */ diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 3623bd7be8..5125be9b94 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -76,6 +76,7 @@ void test_connect(const char *server_host, const char *client_host, int port, grpc_channel *client; grpc_server *server; grpc_completion_queue *cq; + grpc_completion_queue *shutdown_cq; grpc_call *c; grpc_call *s; cq_verifier *cqv; @@ -107,7 +108,7 @@ void test_connect(const char *server_host, const char *client_host, int port, grpc_call_details_init(&call_details); /* Create server. */ - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); server = grpc_server_create(NULL, NULL); grpc_server_register_completion_queue(server, cq, NULL); GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port( @@ -259,11 +260,14 @@ void test_connect(const char *server_host, const char *client_host, int port, grpc_channel_destroy(client); /* Destroy server. */ - grpc_server_shutdown_and_notify(server, cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + shutdown_cq = grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); + grpc_server_shutdown_and_notify(server, shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(server); + grpc_completion_queue_destroy(shutdown_cq); grpc_completion_queue_shutdown(cq); drain_cq(cq); grpc_completion_queue_destroy(cq); diff --git a/test/core/end2end/end2end_tests.h b/test/core/end2end/end2end_tests.h index cb0afd9cd9..ac87c55dc4 100644 --- a/test/core/end2end/end2end_tests.h +++ b/test/core/end2end/end2end_tests.h @@ -50,6 +50,7 @@ typedef struct grpc_end2end_test_config grpc_end2end_test_config; struct grpc_end2end_test_fixture { grpc_completion_queue *cq; + grpc_completion_queue *shutdown_cq; grpc_server *server; grpc_channel *client; void *fixture_data; diff --git a/test/core/end2end/fixtures/h2_census.c b/test/core/end2end/fixtures/h2_census.c index 8e60123ed6..a1fae013c2 100644 --- a/test/core/end2end/fixtures/h2_census.c +++ b/test/core/end2end/fixtures/h2_census.c @@ -65,7 +65,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } @@ -119,9 +121,10 @@ void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/fullstack+census", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | - FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | - FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, + {"chttp2/fullstack+census", + FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | + FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | + FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; diff --git a/test/core/end2end/fixtures/h2_compress.c b/test/core/end2end/fixtures/h2_compress.c index c01e45664b..7a2aa7ff7f 100644 --- a/test/core/end2end/fixtures/h2_compress.c +++ b/test/core/end2end/fixtures/h2_compress.c @@ -69,7 +69,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack_compression( memset(&f, 0, sizeof(f)); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } @@ -119,9 +121,10 @@ void chttp2_tear_down_fullstack_compression(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/fullstack_compression", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | - FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | - FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, + {"chttp2/fullstack_compression", + FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | + FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | + FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, chttp2_create_fixture_fullstack_compression, chttp2_init_client_fullstack_compression, chttp2_init_server_fullstack_compression, diff --git a/test/core/end2end/fixtures/h2_fakesec.c b/test/core/end2end/fixtures/h2_fakesec.c index c9747913c2..0ccdef9767 100644 --- a/test/core/end2end/fixtures/h2_fakesec.c +++ b/test/core/end2end/fixtures/h2_fakesec.c @@ -60,7 +60,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } diff --git a/test/core/end2end/fixtures/h2_fd.c b/test/core/end2end/fixtures/h2_fd.c index 223fadc386..d97b693e33 100644 --- a/test/core/end2end/fixtures/h2_fd.c +++ b/test/core/end2end/fixtures/h2_fd.c @@ -70,7 +70,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( grpc_end2end_test_fixture f; memset(&f, 0, sizeof(f)); f.fixture_data = fixture_data; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); create_sockets(fixture_data->fd_pair); diff --git a/test/core/end2end/fixtures/h2_full+pipe.c b/test/core/end2end/fixtures/h2_full+pipe.c index c6013f3040..812922ff5b 100644 --- a/test/core/end2end/fixtures/h2_full+pipe.c +++ b/test/core/end2end/fixtures/h2_full+pipe.c @@ -70,7 +70,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } @@ -102,9 +104,10 @@ void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/fullstack", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | - FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | - FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, + {"chttp2/fullstack", + FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | + FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | + FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; diff --git a/test/core/end2end/fixtures/h2_full+trace.c b/test/core/end2end/fixtures/h2_full+trace.c index 01316376e0..02d794d08d 100644 --- a/test/core/end2end/fixtures/h2_full+trace.c +++ b/test/core/end2end/fixtures/h2_full+trace.c @@ -70,7 +70,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } @@ -102,9 +104,10 @@ void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/fullstack", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | - FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | - FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, + {"chttp2/fullstack", + FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | + FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | + FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; diff --git a/test/core/end2end/fixtures/h2_full.c b/test/core/end2end/fixtures/h2_full.c index 3399f1981e..bfb300e42f 100644 --- a/test/core/end2end/fixtures/h2_full.c +++ b/test/core/end2end/fixtures/h2_full.c @@ -64,7 +64,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } @@ -96,9 +98,10 @@ void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/fullstack", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | - FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | - FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, + {"chttp2/fullstack", + FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | + FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | + FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c index 44b223664a..0f8e002e56 100644 --- a/test/core/end2end/fixtures/h2_http_proxy.c +++ b/test/core/end2end/fixtures/h2_http_proxy.c @@ -69,7 +69,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( ffd->proxy = grpc_end2end_http_proxy_create(); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } @@ -107,9 +109,10 @@ void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/fullstack", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | - FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | - FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, + {"chttp2/fullstack", + FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | + FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | + FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; diff --git a/test/core/end2end/fixtures/h2_load_reporting.c b/test/core/end2end/fixtures/h2_load_reporting.c index 38321f34db..664c7ba9d4 100644 --- a/test/core/end2end/fixtures/h2_load_reporting.c +++ b/test/core/end2end/fixtures/h2_load_reporting.c @@ -67,7 +67,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_load_reporting( gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } diff --git a/test/core/end2end/fixtures/h2_oauth2.c b/test/core/end2end/fixtures/h2_oauth2.c index 3351652858..73928759fe 100644 --- a/test/core/end2end/fixtures/h2_oauth2.c +++ b/test/core/end2end/fixtures/h2_oauth2.c @@ -113,7 +113,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } diff --git a/test/core/end2end/fixtures/h2_proxy.c b/test/core/end2end/fixtures/h2_proxy.c index 9e37ed4db3..016ba41fbd 100644 --- a/test/core/end2end/fixtures/h2_proxy.c +++ b/test/core/end2end/fixtures/h2_proxy.c @@ -79,7 +79,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( ffd->proxy = grpc_end2end_proxy_create(&proxy_def, client_args, server_args); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } @@ -113,10 +115,11 @@ void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/fullstack+proxy", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | - FEATURE_MASK_SUPPORTS_REQUEST_PROXYING | - FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | - FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, + {"chttp2/fullstack+proxy", + FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | + FEATURE_MASK_SUPPORTS_REQUEST_PROXYING | + FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | + FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c index edf42a4070..445f8a2dd1 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.c +++ b/test/core/end2end/fixtures/h2_sockpair+trace.c @@ -94,7 +94,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( grpc_end2end_test_fixture f; memset(&f, 0, sizeof(f)); f.fixture_data = sfd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); grpc_resource_quota *resource_quota = grpc_resource_quota_create("fixture"); *sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 65536); diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c index 94b2623b3e..66c05c5562 100644 --- a/test/core/end2end/fixtures/h2_sockpair.c +++ b/test/core/end2end/fixtures/h2_sockpair.c @@ -88,7 +88,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( grpc_end2end_test_fixture f; memset(&f, 0, sizeof(f)); f.fixture_data = sfd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); grpc_resource_quota *resource_quota = grpc_resource_quota_create("fixture"); *sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 65536); diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c index 6f9cf8fe26..2d845f369b 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.c +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c @@ -88,7 +88,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( grpc_end2end_test_fixture f; memset(&f, 0, sizeof(f)); f.fixture_data = sfd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); grpc_resource_quota *resource_quota = grpc_resource_quota_create("fixture"); *sfd = grpc_iomgr_create_endpoint_pair("fixture", resource_quota, 1); diff --git a/test/core/end2end/fixtures/h2_ssl.c b/test/core/end2end/fixtures/h2_ssl.c index cf44cd093c..d73944fe6c 100644 --- a/test/core/end2end/fixtures/h2_ssl.c +++ b/test/core/end2end/fixtures/h2_ssl.c @@ -64,7 +64,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } diff --git a/test/core/end2end/fixtures/h2_ssl_cert.c b/test/core/end2end/fixtures/h2_ssl_cert.c index f62331eea3..c8bd7153c2 100644 --- a/test/core/end2end/fixtures/h2_ssl_cert.c +++ b/test/core/end2end/fixtures/h2_ssl_cert.c @@ -67,7 +67,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } @@ -203,15 +205,17 @@ CLIENT_INIT(BAD_CERT_PAIR) typedef enum { SUCCESS, FAIL } test_result; -#define SSL_TEST(request_type, cert_type, result) \ - { \ - {TEST_NAME(request_type, cert_type, result), \ - FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | \ - FEATURE_MASK_SUPPORTS_PER_CALL_CREDENTIALS | \ - FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL, \ - chttp2_create_fixture_secure_fullstack, CLIENT_INIT_NAME(cert_type), \ - SERVER_INIT_NAME(request_type), chttp2_tear_down_secure_fullstack}, \ - result \ +#define SSL_TEST(request_type, cert_type, result) \ + { \ + {TEST_NAME(request_type, cert_type, result), \ + FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | \ + FEATURE_MASK_SUPPORTS_PER_CALL_CREDENTIALS | \ + FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL, \ + chttp2_create_fixture_secure_fullstack, \ + CLIENT_INIT_NAME(cert_type), \ + SERVER_INIT_NAME(request_type), \ + chttp2_tear_down_secure_fullstack}, \ + result \ } /* All test configurations */ @@ -289,9 +293,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -310,6 +315,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_fixture f, diff --git a/test/core/end2end/fixtures/h2_ssl_proxy.c b/test/core/end2end/fixtures/h2_ssl_proxy.c index 740b075bf6..22fbeb1a83 100644 --- a/test/core/end2end/fixtures/h2_ssl_proxy.c +++ b/test/core/end2end/fixtures/h2_ssl_proxy.c @@ -100,7 +100,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( ffd->proxy = grpc_end2end_proxy_create(&proxy_def, client_args, server_args); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } diff --git a/test/core/end2end/fixtures/h2_uds.c b/test/core/end2end/fixtures/h2_uds.c index bc973ea8e3..1c7799d5d9 100644 --- a/test/core/end2end/fixtures/h2_uds.c +++ b/test/core/end2end/fixtures/h2_uds.c @@ -70,7 +70,9 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( unique++); f.fixture_data = ffd; - f.cq = grpc_completion_queue_create(NULL); + f.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + f.shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); return f; } @@ -101,9 +103,10 @@ void chttp2_tear_down_fullstack(grpc_end2end_test_fixture *f) { /* All test configurations */ static grpc_end2end_test_config configs[] = { - {"chttp2/fullstack_uds", FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | - FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | - FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, + {"chttp2/fullstack_uds", + FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION | + FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL | + FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, chttp2_create_fixture_fullstack, chttp2_init_client_fullstack, chttp2_init_server_fullstack, chttp2_tear_down_fullstack}, }; diff --git a/test/core/end2end/fixtures/proxy.c b/test/core/end2end/fixtures/proxy.c index cee053e8c5..7f23eef66a 100644 --- a/test/core/end2end/fixtures/proxy.c +++ b/test/core/end2end/fixtures/proxy.c @@ -104,7 +104,7 @@ grpc_end2end_proxy *grpc_end2end_proxy_create(const grpc_end2end_proxy_def *def, gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port, proxy->server_port); - proxy->cq = grpc_completion_queue_create(NULL); + proxy->cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); proxy->server = def->create_server(proxy->proxy_port, server_args); proxy->client = def->create_client(proxy->server_port, client_args); diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c index 0de8b9459a..61f972e65e 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.c +++ b/test/core/end2end/fuzzers/api_fuzzer.c @@ -314,8 +314,9 @@ static grpc_call_credentials *read_call_creds(input_stream *inp) { cred_artifact_ctx ctx = CRED_ARTIFACT_CTX_INIT; const char *access_token = read_cred_artifact(&ctx, inp, NULL, 0); grpc_call_credentials *out = - access_token == NULL ? NULL : grpc_access_token_credentials_create( - access_token, NULL); + access_token == NULL + ? NULL + : grpc_access_token_credentials_create(access_token, NULL); cred_artifact_ctx_finish(&ctx); return out; } @@ -409,8 +410,9 @@ void my_resolve_address(grpc_exec_ctx *exec_ctx, const char *addr, r->on_done = on_done; r->addrs = addresses; grpc_timer_init( - exec_ctx, &r->timer, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_seconds(1, GPR_TIMESPAN)), + exec_ctx, &r->timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_seconds(1, GPR_TIMESPAN)), grpc_closure_create(finish_resolve, r, grpc_schedule_on_exec_ctx), gpr_now(GPR_CLOCK_MONOTONIC)); } @@ -471,8 +473,9 @@ static void sched_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, fc->ep = ep; fc->deadline = deadline; grpc_timer_init( - exec_ctx, &fc->timer, gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), - gpr_time_from_millis(1, GPR_TIMESPAN)), + exec_ctx, &fc->timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(1, GPR_TIMESPAN)), grpc_closure_create(do_connect, fc, grpc_schedule_on_exec_ctx), gpr_now(GPR_CLOCK_MONOTONIC)); } @@ -735,7 +738,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { g_active_call = new_call(NULL, ROOT); g_resource_quota = grpc_resource_quota_create("api_fuzzer"); - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); while (!is_eof(&inp) || g_channel != NULL || g_server != NULL || pending_channel_watches > 0 || pending_pings > 0 || @@ -748,8 +752,9 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { if (g_server != NULL) { if (!server_shutdown) { grpc_server_shutdown_and_notify( - g_server, cq, create_validator(assert_success_and_decrement, - &pending_server_shutdowns)); + g_server, cq, + create_validator(assert_success_and_decrement, + &pending_server_shutdowns)); server_shutdown = true; pending_server_shutdowns++; } else if (pending_server_shutdowns == 0) { @@ -854,8 +859,9 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { case 5: { if (g_server != NULL) { grpc_server_shutdown_and_notify( - g_server, cq, create_validator(assert_success_and_decrement, - &pending_server_shutdowns)); + g_server, cq, + create_validator(assert_success_and_decrement, + &pending_server_shutdowns)); pending_server_shutdowns++; server_shutdown = true; } else { diff --git a/test/core/end2end/fuzzers/client_fuzzer.c b/test/core/end2end/fuzzers/client_fuzzer.c index e7e7dbefd0..6b8c09cdf4 100644 --- a/test/core/end2end/fuzzers/client_fuzzer.c +++ b/test/core/end2end/fuzzers/client_fuzzer.c @@ -65,7 +65,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { grpc_mock_endpoint_create(discard_write, resource_quota); grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_transport *transport = grpc_create_chttp2_transport(&exec_ctx, NULL, mock_endpoint, 1); grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); diff --git a/test/core/end2end/fuzzers/server_fuzzer.c b/test/core/end2end/fuzzers/server_fuzzer.c index 186542d4b2..76fe97ec0f 100644 --- a/test/core/end2end/fuzzers/server_fuzzer.c +++ b/test/core/end2end/fuzzers/server_fuzzer.c @@ -67,7 +67,8 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { grpc_slice_from_copied_buffer((const char *)data, size)); grpc_server *server = grpc_server_create(NULL, NULL); - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_server_register_completion_queue(server, cq, NULL); // TODO(ctiller): add registered methods (one for POST, one for PUT) // void *registered_method = diff --git a/test/core/end2end/goaway_server_test.c b/test/core/end2end/goaway_server_test.c index a9634bfbae..85d54c16e5 100644 --- a/test/core/end2end/goaway_server_test.c +++ b/test/core/end2end/goaway_server_test.c @@ -121,7 +121,7 @@ int main(int argc, char **argv) { grpc_metadata_array_init(&request_metadata2); grpc_call_details_init(&request_details2); - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); cqv = cq_verifier_create(cq); /* reserve two ports */ diff --git a/test/core/end2end/invalid_call_argument_test.c b/test/core/end2end/invalid_call_argument_test.c index 2a9072570d..0563127af8 100644 --- a/test/core/end2end/invalid_call_argument_test.c +++ b/test/core/end2end/invalid_call_argument_test.c @@ -73,7 +73,8 @@ static void prepare_test(int is_client) { grpc_metadata_array_init(&g_state.initial_metadata_recv); grpc_metadata_array_init(&g_state.trailing_metadata_recv); g_state.deadline = grpc_timeout_seconds_to_deadline(2); - g_state.cq = grpc_completion_queue_create(NULL); + g_state.cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); g_state.cqv = cq_verifier_create(g_state.cq); g_state.details = grpc_empty_slice(); memset(g_state.ops, 0, sizeof(g_state.ops)); @@ -123,6 +124,7 @@ static void prepare_test(int is_client) { } static void cleanup_test() { + grpc_completion_queue *shutdown_cq; grpc_call_destroy(g_state.call); cq_verifier_destroy(g_state.cqv); grpc_channel_destroy(g_state.chan); @@ -131,12 +133,15 @@ static void cleanup_test() { grpc_metadata_array_destroy(&g_state.trailing_metadata_recv); if (!g_state.is_client) { + shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); grpc_call_destroy(g_state.server_call); - grpc_server_shutdown_and_notify(g_state.server, g_state.cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck(g_state.cq, tag(1000), + grpc_server_shutdown_and_notify(g_state.server, shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); grpc_server_destroy(g_state.server); grpc_call_details_destroy(&g_state.call_details); grpc_metadata_array_destroy(&g_state.server_initial_metadata_recv); diff --git a/test/core/end2end/multiple_server_queues_test.c b/test/core/end2end/multiple_server_queues_test.c index 5e2eaf4ae9..94e615f1aa 100644 --- a/test/core/end2end/multiple_server_queues_test.c +++ b/test/core/end2end/multiple_server_queues_test.c @@ -37,27 +37,37 @@ int main(int argc, char **argv) { grpc_completion_queue *cq1; grpc_completion_queue *cq2; + grpc_completion_queue *cq3; grpc_server *server; grpc_test_init(argc, argv); grpc_init(); - cq1 = grpc_completion_queue_create(NULL); - cq2 = grpc_completion_queue_create(NULL); + cq1 = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + cq2 = grpc_completion_queue_create(GRPC_CQ_NEXT, NON_LISTENING, NULL); + cq3 = grpc_completion_queue_create(GRPC_CQ_NEXT, NON_POLLING, NULL); + server = grpc_server_create(NULL, NULL); grpc_server_register_completion_queue(server, cq1, NULL); grpc_server_add_insecure_http2_port(server, "[::]:0"); grpc_server_register_completion_queue(server, cq2, NULL); + grpc_server_register_completion_queue(server, cq3, NULL); + grpc_server_start(server); grpc_server_shutdown_and_notify(server, cq2, NULL); grpc_completion_queue_next(cq2, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); /* cue queue hang */ grpc_completion_queue_shutdown(cq1); grpc_completion_queue_shutdown(cq2); + grpc_completion_queue_shutdown(cq3); + grpc_completion_queue_next(cq1, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); grpc_completion_queue_next(cq2, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + grpc_completion_queue_next(cq3, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + grpc_server_destroy(server); grpc_completion_queue_destroy(cq1); grpc_completion_queue_destroy(cq2); + grpc_completion_queue_destroy(cq3); grpc_shutdown(); return 0; } diff --git a/test/core/end2end/no_server_test.c b/test/core/end2end/no_server_test.c index 26d26d8f7a..e16c129d67 100644 --- a/test/core/end2end/no_server_test.c +++ b/test/core/end2end/no_server_test.c @@ -59,7 +59,7 @@ int main(int argc, char **argv) { grpc_metadata_array_init(&trailing_metadata_recv); - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); cqv = cq_verifier_create(cq); /* create a call, channel to a non existant server */ diff --git a/test/core/end2end/tests/authority_not_supported.c b/test/core/end2end/tests/authority_not_supported.c index 7db2fd6b27..4b6878794d 100644 --- a/test/core/end2end/tests/authority_not_supported.c +++ b/test/core/end2end/tests/authority_not_supported.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Request/response with metadata and payload.*/ diff --git a/test/core/end2end/tests/bad_hostname.c b/test/core/end2end/tests/bad_hostname.c index f50a5805a2..c7bdd34a0b 100644 --- a/test/core/end2end/tests/bad_hostname.c +++ b/test/core/end2end/tests/bad_hostname.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_fixture f) { diff --git a/test/core/end2end/tests/binary_metadata.c b/test/core/end2end/tests/binary_metadata.c index 7fb60f4495..90258185ba 100644 --- a/test/core/end2end/tests/binary_metadata.c +++ b/test/core/end2end/tests/binary_metadata.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Request/response with metadata and payload.*/ diff --git a/test/core/end2end/tests/call_creds.c b/test/core/end2end/tests/call_creds.c index 38cba50e12..2a9269345e 100644 --- a/test/core/end2end/tests/call_creds.c +++ b/test/core/end2end/tests/call_creds.c @@ -90,9 +90,9 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + f->shutdown_cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -111,6 +111,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void print_auth_context(int is_client, const grpc_auth_context *ctx) { diff --git a/test/core/end2end/tests/cancel_after_accept.c b/test/core/end2end/tests/cancel_after_accept.c index 1a92aa4837..3c59511ebe 100644 --- a/test/core/end2end/tests/cancel_after_accept.c +++ b/test/core/end2end/tests/cancel_after_accept.c @@ -79,9 +79,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -100,6 +101,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Cancel after accept, no payload */ diff --git a/test/core/end2end/tests/cancel_after_client_done.c b/test/core/end2end/tests/cancel_after_client_done.c index 0afeecb037..2edadeff42 100644 --- a/test/core/end2end/tests/cancel_after_client_done.c +++ b/test/core/end2end/tests/cancel_after_client_done.c @@ -73,9 +73,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -94,6 +95,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Cancel after accept with a writes closed, no payload */ diff --git a/test/core/end2end/tests/cancel_after_invoke.c b/test/core/end2end/tests/cancel_after_invoke.c index 8a96ef2f89..6a7fae8614 100644 --- a/test/core/end2end/tests/cancel_after_invoke.c +++ b/test/core/end2end/tests/cancel_after_invoke.c @@ -75,9 +75,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -96,6 +97,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Cancel after invoke, no payload */ diff --git a/test/core/end2end/tests/cancel_before_invoke.c b/test/core/end2end/tests/cancel_before_invoke.c index 586aa7ead3..e96ee5a0e1 100644 --- a/test/core/end2end/tests/cancel_before_invoke.c +++ b/test/core/end2end/tests/cancel_before_invoke.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Cancel before invoke */ diff --git a/test/core/end2end/tests/cancel_in_a_vacuum.c b/test/core/end2end/tests/cancel_in_a_vacuum.c index bc462ddcf5..630527a658 100644 --- a/test/core/end2end/tests/cancel_in_a_vacuum.c +++ b/test/core/end2end/tests/cancel_in_a_vacuum.c @@ -73,9 +73,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -94,6 +95,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Cancel and do nothing */ diff --git a/test/core/end2end/tests/cancel_with_status.c b/test/core/end2end/tests/cancel_with_status.c index 7d03fe5580..2a81feaf0b 100644 --- a/test/core/end2end/tests/cancel_with_status.c +++ b/test/core/end2end/tests/cancel_with_status.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/compressed_payload.c b/test/core/end2end/tests/compressed_payload.c index 7dd8c112d1..1ad4d118e0 100644 --- a/test/core/end2end/tests/compressed_payload.c +++ b/test/core/end2end/tests/compressed_payload.c @@ -80,9 +80,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -101,6 +102,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void request_for_disabled_algorithm( diff --git a/test/core/end2end/tests/connectivity.c b/test/core/end2end/tests/connectivity.c index 979419a100..eb84aaed16 100644 --- a/test/core/end2end/tests/connectivity.c +++ b/test/core/end2end/tests/connectivity.c @@ -171,6 +171,9 @@ static void test_connectivity(grpc_end2end_test_config config) { grpc_channel_destroy(f.client); grpc_completion_queue_shutdown(f.cq); grpc_completion_queue_destroy(f.cq); + + /* shutdown_cq is not used in this test */ + grpc_completion_queue_destroy(f.shutdown_cq); config.tear_down_data(&f); cq_verifier_destroy(cqv); diff --git a/test/core/end2end/tests/default_host.c b/test/core/end2end/tests/default_host.c index bc1829b24b..b765de3cea 100644 --- a/test/core/end2end/tests/default_host.c +++ b/test/core/end2end/tests/default_host.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_fixture f) { diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c index 05440a6f56..d7265a7780 100644 --- a/test/core/end2end/tests/disappearing_server.c +++ b/test/core/end2end/tests/disappearing_server.c @@ -77,6 +77,9 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + + /* Note: shutdown_cq was unused in this test */ + grpc_completion_queue_destroy(f->shutdown_cq); } static void do_request_and_shutdown_server(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/empty_batch.c b/test/core/end2end/tests/empty_batch.c index 50bb6b849e..9aa9294258 100644 --- a/test/core/end2end/tests/empty_batch.c +++ b/test/core/end2end/tests/empty_batch.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void empty_batch_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/filter_call_init_fails.c b/test/core/end2end/tests/filter_call_init_fails.c index d2d6e82d57..dee3531f80 100644 --- a/test/core/end2end/tests/filter_call_init_fails.c +++ b/test/core/end2end/tests/filter_call_init_fails.c @@ -80,9 +80,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -101,6 +102,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } // Simple request via a server filter that always fails to initialize diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 25e606556d..d41eb48101 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -77,9 +77,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -98,6 +99,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Simple request via a server filter that always closes the stream.*/ diff --git a/test/core/end2end/tests/filter_latency.c b/test/core/end2end/tests/filter_latency.c index d05e9e79a1..ffe9880d81 100644 --- a/test/core/end2end/tests/filter_latency.c +++ b/test/core/end2end/tests/filter_latency.c @@ -84,9 +84,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -105,6 +106,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } // Simple request via a server filter that saves the reported latency value. diff --git a/test/core/end2end/tests/graceful_server_shutdown.c b/test/core/end2end/tests/graceful_server_shutdown.c index a3ad260cc2..8e35d27af9 100644 --- a/test/core/end2end/tests/graceful_server_shutdown.c +++ b/test/core/end2end/tests/graceful_server_shutdown.c @@ -89,6 +89,8 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + /* Note: shutdown_cq is not used in this test */ + grpc_completion_queue_destroy(f->shutdown_cq); } static void test_early_server_shutdown_finishes_inflight_calls( diff --git a/test/core/end2end/tests/high_initial_seqno.c b/test/core/end2end/tests/high_initial_seqno.c index cca8532b0e..1707c8192c 100644 --- a/test/core/end2end/tests/high_initial_seqno.c +++ b/test/core/end2end/tests/high_initial_seqno.c @@ -76,9 +76,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -97,6 +98,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/hpack_size.c b/test/core/end2end/tests/hpack_size.c index 7601722dee..68a33f3008 100644 --- a/test/core/end2end/tests/hpack_size.c +++ b/test/core/end2end/tests/hpack_size.c @@ -216,9 +216,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -237,6 +238,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/idempotent_request.c b/test/core/end2end/tests/idempotent_request.c index cef2e100be..3455c6eb41 100644 --- a/test/core/end2end/tests/idempotent_request.c +++ b/test/core/end2end/tests/idempotent_request.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/invoke_large_request.c b/test/core/end2end/tests/invoke_large_request.c index d799bd8ccf..3bce8dffe4 100644 --- a/test/core/end2end/tests/invoke_large_request.c +++ b/test/core/end2end/tests/invoke_large_request.c @@ -71,9 +71,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -92,6 +93,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static grpc_slice large_slice(void) { diff --git a/test/core/end2end/tests/keepalive_timeout.c b/test/core/end2end/tests/keepalive_timeout.c index 4296be3619..66dcc1cd10 100644 --- a/test/core/end2end/tests/keepalive_timeout.c +++ b/test/core/end2end/tests/keepalive_timeout.c @@ -75,10 +75,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT( - grpc_completion_queue_pluck(f->cq, tag(1000), five_seconds_time(), NULL) - .type == GRPC_OP_COMPLETE); + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + five_seconds_time(), NULL) + .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; } @@ -96,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Client sends a request, server replies with a payload, then waits for the diff --git a/test/core/end2end/tests/large_metadata.c b/test/core/end2end/tests/large_metadata.c index ac4c0e7f3b..227db3293f 100644 --- a/test/core/end2end/tests/large_metadata.c +++ b/test/core/end2end/tests/large_metadata.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } // Request with a large amount of metadata. diff --git a/test/core/end2end/tests/load_reporting_hook.c b/test/core/end2end/tests/load_reporting_hook.c index d1ee26fe50..d8e9eaafa9 100644 --- a/test/core/end2end/tests/load_reporting_hook.c +++ b/test/core/end2end/tests/load_reporting_hook.c @@ -99,9 +99,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -120,6 +121,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void request_response_with_payload( diff --git a/test/core/end2end/tests/max_concurrent_streams.c b/test/core/end2end/tests/max_concurrent_streams.c index e81a628944..ebc92c6965 100644 --- a/test/core/end2end/tests/max_concurrent_streams.c +++ b/test/core/end2end/tests/max_concurrent_streams.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c index b15d30f58c..c015b653b8 100644 --- a/test/core/end2end/tests/max_message_length.c +++ b/test/core/end2end/tests/max_message_length.c @@ -81,9 +81,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -102,6 +103,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } // Test with request larger than the limit. diff --git a/test/core/end2end/tests/negative_deadline.c b/test/core/end2end/tests/negative_deadline.c index 0b61efbac9..f0e81533c2 100644 --- a/test/core/end2end/tests/negative_deadline.c +++ b/test/core/end2end/tests/negative_deadline.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/network_status_change.c b/test/core/end2end/tests/network_status_change.c index 7540ce93a1..3a420cce23 100644 --- a/test/core/end2end/tests/network_status_change.c +++ b/test/core/end2end/tests/network_status_change.c @@ -75,9 +75,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -96,6 +97,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Client sends a request with payload, server reads then returns status. */ diff --git a/test/core/end2end/tests/no_logging.c b/test/core/end2end/tests/no_logging.c index 56e48a88a8..4233f9ab5b 100644 --- a/test/core/end2end/tests/no_logging.c +++ b/test/core/end2end/tests/no_logging.c @@ -102,9 +102,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -123,6 +124,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/no_op.c b/test/core/end2end/tests/no_op.c index 62fc728c3e..d5712cb1cc 100644 --- a/test/core/end2end/tests/no_op.c +++ b/test/core/end2end/tests/no_op.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void test_no_op(grpc_end2end_test_config config) { diff --git a/test/core/end2end/tests/payload.c b/test/core/end2end/tests/payload.c index b04ee5705c..b0b639681f 100644 --- a/test/core/end2end/tests/payload.c +++ b/test/core/end2end/tests/payload.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Creates and returns a grpc_slice containing random alphanumeric characters. diff --git a/test/core/end2end/tests/ping.c b/test/core/end2end/tests/ping.c index f5bfac2255..966288239c 100644 --- a/test/core/end2end/tests/ping.c +++ b/test/core/end2end/tests/ping.c @@ -95,6 +95,9 @@ static void test_ping(grpc_end2end_test_config config) { grpc_channel_destroy(f.client); grpc_completion_queue_shutdown(f.cq); grpc_completion_queue_destroy(f.cq); + + /* f.shutdown_cq is not used in this test */ + grpc_completion_queue_destroy(f.shutdown_cq); config.tear_down_data(&f); cq_verifier_destroy(cqv); diff --git a/test/core/end2end/tests/ping_pong_streaming.c b/test/core/end2end/tests/ping_pong_streaming.c index 848f76018d..a4b48ea5c3 100644 --- a/test/core/end2end/tests/ping_pong_streaming.c +++ b/test/core/end2end/tests/ping_pong_streaming.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Client pings and server pongs. Repeat messages rounds before finishing. */ diff --git a/test/core/end2end/tests/registered_call.c b/test/core/end2end/tests/registered_call.c index 9c8ce89c83..4846cfe99e 100644 --- a/test/core/end2end/tests/registered_call.c +++ b/test/core/end2end/tests/registered_call.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/request_with_flags.c b/test/core/end2end/tests/request_with_flags.c index 329359e08b..fa943cf57d 100644 --- a/test/core/end2end/tests/request_with_flags.c +++ b/test/core/end2end/tests/request_with_flags.c @@ -73,9 +73,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -94,6 +95,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void test_invoke_request_with_flags( diff --git a/test/core/end2end/tests/request_with_payload.c b/test/core/end2end/tests/request_with_payload.c index f71f92bbb8..3cf6d828b5 100644 --- a/test/core/end2end/tests/request_with_payload.c +++ b/test/core/end2end/tests/request_with_payload.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Client sends a request with payload, server reads then returns status. */ diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c index 4f9ed7a3a1..115fddc167 100644 --- a/test/core/end2end/tests/resource_quota_server.c +++ b/test/core/end2end/tests/resource_quota_server.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Creates and returns a grpc_slice containing random alphanumeric characters. diff --git a/test/core/end2end/tests/server_finishes_request.c b/test/core/end2end/tests/server_finishes_request.c index b42d17002e..b545f14454 100644 --- a/test/core/end2end/tests/server_finishes_request.c +++ b/test/core/end2end/tests/server_finishes_request.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/shutdown_finishes_calls.c b/test/core/end2end/tests/shutdown_finishes_calls.c index c019682ea6..d74b740a6c 100644 --- a/test/core/end2end/tests/shutdown_finishes_calls.c +++ b/test/core/end2end/tests/shutdown_finishes_calls.c @@ -82,6 +82,8 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + /* f->shutdown_cq is not used in this test */ + grpc_completion_queue_destroy(f->shutdown_cq); } static void test_early_server_shutdown_finishes_inflight_calls( diff --git a/test/core/end2end/tests/shutdown_finishes_tags.c b/test/core/end2end/tests/shutdown_finishes_tags.c index 5540d2aab9..e5d3c43e69 100644 --- a/test/core/end2end/tests/shutdown_finishes_tags.c +++ b/test/core/end2end/tests/shutdown_finishes_tags.c @@ -82,6 +82,8 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + /* f->shutdown_cq is not used in this test */ + grpc_completion_queue_destroy(f->shutdown_cq); } static void test_early_server_shutdown_finishes_tags( diff --git a/test/core/end2end/tests/simple_cacheable_request.c b/test/core/end2end/tests/simple_cacheable_request.c index 4eef02e9ee..bb594a9fbd 100644 --- a/test/core/end2end/tests/simple_cacheable_request.c +++ b/test/core/end2end/tests/simple_cacheable_request.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Request/response with metadata and payload.*/ diff --git a/test/core/end2end/tests/simple_delayed_request.c b/test/core/end2end/tests/simple_delayed_request.c index e3b6aee783..489ed5c073 100644 --- a/test/core/end2end/tests/simple_delayed_request.c +++ b/test/core/end2end/tests/simple_delayed_request.c @@ -60,9 +60,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -81,6 +82,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_delayed_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/simple_metadata.c b/test/core/end2end/tests/simple_metadata.c index 7ab5563cfa..f8a5254fe8 100644 --- a/test/core/end2end/tests/simple_metadata.c +++ b/test/core/end2end/tests/simple_metadata.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Request/response with metadata and payload.*/ diff --git a/test/core/end2end/tests/simple_request.c b/test/core/end2end/tests/simple_request.c index af5d74959e..47fbff3375 100644 --- a/test/core/end2end/tests/simple_request.c +++ b/test/core/end2end/tests/simple_request.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } static void simple_request_body(grpc_end2end_test_config config, diff --git a/test/core/end2end/tests/streaming_error_response.c b/test/core/end2end/tests/streaming_error_response.c index 42055907c8..354c5091fd 100644 --- a/test/core/end2end/tests/streaming_error_response.c +++ b/test/core/end2end/tests/streaming_error_response.c @@ -74,9 +74,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -95,6 +96,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Client sends a request with payload, server reads then returns status. */ diff --git a/test/core/end2end/tests/trailing_metadata.c b/test/core/end2end/tests/trailing_metadata.c index dbbda505bc..0cb111976a 100644 --- a/test/core/end2end/tests/trailing_metadata.c +++ b/test/core/end2end/tests/trailing_metadata.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Request/response with metadata and payload.*/ diff --git a/test/core/end2end/tests/write_buffering.c b/test/core/end2end/tests/write_buffering.c index abf90ca6e0..7bb44f5c6f 100644 --- a/test/core/end2end/tests/write_buffering.c +++ b/test/core/end2end/tests/write_buffering.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Client sends a request with payload, server reads then returns status. */ diff --git a/test/core/end2end/tests/write_buffering_at_end.c b/test/core/end2end/tests/write_buffering_at_end.c index 8c02b425ba..2ff506a744 100644 --- a/test/core/end2end/tests/write_buffering_at_end.c +++ b/test/core/end2end/tests/write_buffering_at_end.c @@ -72,9 +72,10 @@ static void drain_cq(grpc_completion_queue *cq) { static void shutdown_server(grpc_end2end_test_fixture *f) { if (!f->server) return; - grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - f->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); grpc_server_destroy(f->server); f->server = NULL; @@ -93,6 +94,7 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_shutdown(f->cq); drain_cq(f->cq); grpc_completion_queue_destroy(f->cq); + grpc_completion_queue_destroy(f->shutdown_cq); } /* Client sends a request with payload, server reads then returns status. */ diff --git a/test/core/fling/client.c b/test/core/fling/client.c index 85bab6d431..188696c189 100644 --- a/test/core/fling/client.c +++ b/test/core/fling/client.c @@ -208,7 +208,7 @@ int main(int argc, char **argv) { } channel = grpc_insecure_channel_create(target, NULL, NULL); - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); the_buffer = grpc_raw_byte_buffer_create(&slice, (size_t)payload_size); histogram = gpr_histogram_create(0.01, 60e9); diff --git a/test/core/fling/server.c b/test/core/fling/server.c index 7ea54b1167..f1b5f39a60 100644 --- a/test/core/fling/server.c +++ b/test/core/fling/server.c @@ -185,6 +185,7 @@ int main(int argc, char **argv) { call_state *s; char *addr_buf = NULL; gpr_cmdline *cl; + grpc_completion_queue *shutdown_cq; int shutdown_started = 0; int shutdown_finished = 0; @@ -214,7 +215,7 @@ int main(int argc, char **argv) { } gpr_log(GPR_INFO, "creating server on: %s", addr); - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); if (secure) { grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {test_server1_key, test_server1_cert}; @@ -242,16 +243,24 @@ int main(int argc, char **argv) { while (!shutdown_finished) { if (got_sigint && !shutdown_started) { gpr_log(GPR_INFO, "Shutting down due to SIGINT"); - grpc_server_shutdown_and_notify(server, cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) - .type == GRPC_OP_COMPLETE); + + shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); + grpc_server_shutdown_and_notify(server, shutdown_cq, tag(1000)); + + GPR_ASSERT( + grpc_completion_queue_pluck(shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), NULL) + .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); + grpc_completion_queue_shutdown(cq); shutdown_started = 1; } ev = grpc_completion_queue_next( - cq, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000000, GPR_TIMESPAN)), + cq, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000000, GPR_TIMESPAN)), NULL); s = ev.tag; switch (ev.type) { diff --git a/test/core/handshake/client_ssl.c b/test/core/handshake/client_ssl.c index 5cfe60de4b..72e8ffe02a 100644 --- a/test/core/handshake/client_ssl.c +++ b/test/core/handshake/client_ssl.c @@ -289,7 +289,9 @@ static bool client_ssl_test(char *server_alpn_preferred) { // completed and we know that the client's ALPN list satisfied the server. int retries = 10; grpc_connectivity_state state = GRPC_CHANNEL_IDLE; - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); + while (state != GRPC_CHANNEL_READY && retries-- > 0) { grpc_channel_watch_connectivity_state( channel, state, grpc_timeout_seconds_to_deadline(3), cq, NULL); diff --git a/test/core/handshake/server_ssl.c b/test/core/handshake/server_ssl.c index 0bd5a03cff..f61ad786bc 100644 --- a/test/core/handshake/server_ssl.c +++ b/test/core/handshake/server_ssl.c @@ -104,7 +104,8 @@ static void server_thread(void *arg) { GPR_ASSERT(grpc_server_add_secure_http2_port(server, addr, ssl_creds)); free(addr); - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_server_register_completion_queue(server, cq, NULL); grpc_server_start(server); diff --git a/test/core/memory_usage/client.c b/test/core/memory_usage/client.c index 09f0e2d867..588f559d4c 100644 --- a/test/core/memory_usage/client.c +++ b/test/core/memory_usage/client.c @@ -222,7 +222,7 @@ int main(int argc, char **argv) { calls[k].details = grpc_empty_slice(); } - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); struct grpc_memory_counters client_channel_start = grpc_memory_counters_snapshot(); @@ -260,8 +260,9 @@ int main(int argc, char **argv) { do { event = grpc_completion_queue_next( - cq, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(10000, GPR_TIMESPAN)), + cq, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(10000, GPR_TIMESPAN)), NULL); } while (event.type != GRPC_QUEUE_TIMEOUT); diff --git a/test/core/memory_usage/server.c b/test/core/memory_usage/server.c index ab059c25b8..eb9794b558 100644 --- a/test/core/memory_usage/server.c +++ b/test/core/memory_usage/server.c @@ -161,6 +161,7 @@ int main(int argc, char **argv) { grpc_event ev; char *addr_buf = NULL; gpr_cmdline *cl; + grpc_completion_queue *shutdown_cq; int shutdown_started = 0; int shutdown_finished = 0; @@ -188,7 +189,7 @@ int main(int argc, char **argv) { } gpr_log(GPR_INFO, "creating server on: %s", addr); - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); struct grpc_memory_counters before_server_create = grpc_memory_counters_snapshot(); @@ -230,16 +231,22 @@ int main(int argc, char **argv) { while (!shutdown_finished) { if (got_sigint && !shutdown_started) { gpr_log(GPR_INFO, "Shutting down due to SIGINT"); - grpc_server_shutdown_and_notify(server, cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) - .type == GRPC_OP_COMPLETE); + + shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); + grpc_server_shutdown_and_notify(server, shutdown_cq, tag(1000)); + GPR_ASSERT( + grpc_completion_queue_pluck(shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), NULL) + .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); grpc_completion_queue_shutdown(cq); shutdown_started = 1; } ev = grpc_completion_queue_next( - cq, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(1000000, GPR_TIMESPAN)), + cq, + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(1000000, GPR_TIMESPAN)), NULL); fling_call *s = ev.tag; switch (ev.type) { diff --git a/test/core/surface/alarm_test.c b/test/core/surface/alarm_test.c index 4afe357c27..7419d388a3 100644 --- a/test/core/surface/alarm_test.c +++ b/test/core/surface/alarm_test.c @@ -58,7 +58,7 @@ static void test_alarm(void) { grpc_completion_queue *cc; LOG_TEST("test_alarm"); - cc = grpc_completion_queue_create(NULL); + cc = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); { /* regular expiry */ grpc_event ev; diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index 07f6a9869b..f76074f207 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -51,33 +51,74 @@ static void *create_test_tag(void) { static void shutdown_and_destroy(grpc_completion_queue *cc) { grpc_event ev; grpc_completion_queue_shutdown(cc); - ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + + switch (grpc_get_cq_completion_type(cc)) { + case GRPC_CQ_NEXT: { + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), + NULL); + break; + } + case GRPC_CQ_PLUCK: { + ev = grpc_completion_queue_pluck(cc, create_test_tag(), + gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + break; + } + default: { + gpr_log(GPR_ERROR, "Unknown completion type"); + break; + } + } + GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); grpc_completion_queue_destroy(cc); } /* ensure we can create and destroy a completion channel */ static void test_no_op(void) { + grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK}; + grpc_cq_polling_type polling_types[] = {DEFAULT_POLLING, NON_LISTENING, + NON_POLLING}; LOG_TEST("test_no_op"); - shutdown_and_destroy(grpc_completion_queue_create(NULL)); + + for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) { + for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) { + shutdown_and_destroy(grpc_completion_queue_create( + completion_types[i], polling_types[j], NULL)); + } + } } static void test_pollset_conversion(void) { - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); - GPR_ASSERT(grpc_cq_from_pollset(grpc_cq_pollset(cq)) == cq); - shutdown_and_destroy(cq); + grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK}; + grpc_cq_polling_type polling_types[] = {DEFAULT_POLLING, NON_LISTENING}; + grpc_completion_queue *cq; + + LOG_TEST("test_pollset_conversion"); + + for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) { + for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) { + cq = grpc_completion_queue_create(completion_types[i], polling_types[j], + NULL); + GPR_ASSERT(grpc_cq_from_pollset(grpc_cq_pollset(cq)) == cq); + shutdown_and_destroy(cq); + } + } } static void test_wait_empty(void) { + grpc_cq_polling_type polling_types[] = {DEFAULT_POLLING, NON_LISTENING, + NON_POLLING}; grpc_completion_queue *cc; grpc_event event; LOG_TEST("test_wait_empty"); - cc = grpc_completion_queue_create(NULL); - event = grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), NULL); - GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT); - shutdown_and_destroy(cc); + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + cc = grpc_completion_queue_create(GRPC_CQ_NEXT, polling_types[i], NULL); + event = grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT); + shutdown_and_destroy(cc); + } } static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg, @@ -87,50 +128,66 @@ static void test_cq_end_op(void) { grpc_event ev; grpc_completion_queue *cc; grpc_cq_completion completion; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_cq_polling_type polling_types[] = {DEFAULT_POLLING, NON_LISTENING, + NON_POLLING}; + + grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx; void *tag = create_test_tag(); LOG_TEST("test_cq_end_op"); - cc = grpc_completion_queue_create(NULL); + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + exec_ctx = init_exec_ctx; // Reset exec_ctx + cc = grpc_completion_queue_create(GRPC_CQ_NEXT, polling_types[i], NULL); - grpc_cq_begin_op(cc, tag); - grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, - NULL, &completion); + grpc_cq_begin_op(cc, tag); + grpc_cq_end_op(&exec_ctx, cc, tag, GRPC_ERROR_NONE, + do_nothing_end_completion, NULL, &completion); - ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); - GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); - GPR_ASSERT(ev.tag == tag); - GPR_ASSERT(ev.success); + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); + GPR_ASSERT(ev.tag == tag); + GPR_ASSERT(ev.success); - shutdown_and_destroy(cc); - grpc_exec_ctx_finish(&exec_ctx); + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); + } } static void test_shutdown_then_next_polling(void) { + grpc_cq_polling_type polling_types[] = {DEFAULT_POLLING, NON_LISTENING, + NON_POLLING}; grpc_completion_queue *cc; grpc_event event; LOG_TEST("test_shutdown_then_next_polling"); - cc = grpc_completion_queue_create(NULL); - grpc_completion_queue_shutdown(cc); - event = - grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); - GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN); - grpc_completion_queue_destroy(cc); + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + cc = grpc_completion_queue_create(GRPC_CQ_NEXT, polling_types[i], NULL); + grpc_completion_queue_shutdown(cc); + event = + grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN); + grpc_completion_queue_destroy(cc); + } } static void test_shutdown_then_next_with_timeout(void) { + grpc_cq_polling_type polling_types[] = {DEFAULT_POLLING, NON_LISTENING, + NON_POLLING}; grpc_completion_queue *cc; grpc_event event; LOG_TEST("test_shutdown_then_next_with_timeout"); - cc = grpc_completion_queue_create(NULL); - grpc_completion_queue_shutdown(cc); - event = - grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); - GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN); - grpc_completion_queue_destroy(cc); + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + cc = grpc_completion_queue_create(GRPC_CQ_NEXT, polling_types[i], NULL); + + grpc_completion_queue_shutdown(cc); + event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME), + NULL); + GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN); + grpc_completion_queue_destroy(cc); + } } static void test_pluck(void) { @@ -138,7 +195,10 @@ static void test_pluck(void) { grpc_completion_queue *cc; void *tags[128]; grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_cq_polling_type polling_types[] = {DEFAULT_POLLING, NON_LISTENING, + NON_POLLING}; + grpc_exec_ctx init_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx exec_ctx; unsigned i, j; LOG_TEST("test_pluck"); @@ -150,49 +210,62 @@ static void test_pluck(void) { } } - cc = grpc_completion_queue_create(NULL); + for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) { + exec_ctx = init_exec_ctx; // reset exec_ctx + cc = grpc_completion_queue_create(GRPC_CQ_PLUCK, polling_types[pidx], NULL); - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, tags[i]); - grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, - do_nothing_end_completion, NULL, &completions[i]); - } + for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { + grpc_cq_begin_op(cc, tags[i]); + grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, + do_nothing_end_completion, NULL, &completions[i]); + } - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - ev = grpc_completion_queue_pluck(cc, tags[i], - gpr_inf_past(GPR_CLOCK_REALTIME), NULL); - GPR_ASSERT(ev.tag == tags[i]); - } + for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { + ev = grpc_completion_queue_pluck(cc, tags[i], + gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.tag == tags[i]); + } - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - grpc_cq_begin_op(cc, tags[i]); - grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, - do_nothing_end_completion, NULL, &completions[i]); - } + for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { + grpc_cq_begin_op(cc, tags[i]); + grpc_cq_end_op(&exec_ctx, cc, tags[i], GRPC_ERROR_NONE, + do_nothing_end_completion, NULL, &completions[i]); + } - for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { - ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1], - gpr_inf_past(GPR_CLOCK_REALTIME), NULL); - GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]); - } + for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { + ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1], + gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]); + } - shutdown_and_destroy(cc); - grpc_exec_ctx_finish(&exec_ctx); + shutdown_and_destroy(cc); + grpc_exec_ctx_finish(&exec_ctx); + } } static void test_pluck_after_shutdown(void) { + grpc_cq_polling_type polling_types[] = {DEFAULT_POLLING, NON_LISTENING, + NON_POLLING}; grpc_event ev; grpc_completion_queue *cc; LOG_TEST("test_pluck_after_shutdown"); - cc = grpc_completion_queue_create(NULL); - grpc_completion_queue_shutdown(cc); - ev = grpc_completion_queue_pluck(cc, NULL, gpr_inf_future(GPR_CLOCK_REALTIME), - NULL); - GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); - grpc_completion_queue_destroy(cc); + + for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) { + cc = grpc_completion_queue_create(GRPC_CQ_PLUCK, polling_types[i], NULL); + grpc_completion_queue_shutdown(cc); + ev = grpc_completion_queue_pluck(cc, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); + grpc_completion_queue_destroy(cc); + } } +struct thread_state { + grpc_completion_queue *cc; + void *tag; +}; + int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); diff --git a/test/core/surface/completion_queue_threading_test.c b/test/core/surface/completion_queue_threading_test.c index 2d55ead843..b101cd13f7 100644 --- a/test/core/surface/completion_queue_threading_test.c +++ b/test/core/surface/completion_queue_threading_test.c @@ -52,7 +52,24 @@ static void *create_test_tag(void) { static void shutdown_and_destroy(grpc_completion_queue *cc) { grpc_event ev; grpc_completion_queue_shutdown(cc); - ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + + switch (grpc_get_cq_completion_type(cc)) { + case GRPC_CQ_NEXT: { + ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), + NULL); + break; + } + case GRPC_CQ_PLUCK: { + ev = grpc_completion_queue_pluck(cc, create_test_tag(), + gpr_inf_past(GPR_CLOCK_REALTIME), NULL); + break; + } + default: { + gpr_log(GPR_ERROR, "Unknown completion type"); + break; + } + } + GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN); grpc_completion_queue_destroy(cc); } @@ -84,7 +101,7 @@ static void test_too_many_plucks(void) { LOG_TEST("test_too_many_plucks"); - cc = grpc_completion_queue_create(NULL); + cc = grpc_completion_queue_create(GRPC_CQ_PLUCK, DEFAULT_POLLING, NULL); gpr_thd_options_set_joinable(&thread_options); for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -210,7 +227,8 @@ static void test_threading(size_t producers, size_t consumers) { gpr_malloc((producers + consumers) * sizeof(test_thread_options)); gpr_event phase1 = GPR_EVENT_INIT; gpr_event phase2 = GPR_EVENT_INIT; - grpc_completion_queue *cc = grpc_completion_queue_create(NULL); + grpc_completion_queue *cc = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); size_t i; size_t total_consumed = 0; static int optid = 101; diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index ff927385d4..3758b86fab 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -66,7 +66,8 @@ static int detag(void *p) { return (int)(uintptr_t)p; } void create_loop_destroy(void *addr) { for (int i = 0; i < NUM_OUTER_LOOPS; ++i) { - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_channel *chan = grpc_insecure_channel_create((char *)addr, NULL, NULL); for (int j = 0; j < NUM_INNER_LOOPS; ++j) { @@ -195,7 +196,7 @@ int main(int argc, char **argv) { gpr_asprintf(&args.addr, "localhost:%d", port); args.server = grpc_server_create(NULL, NULL); grpc_server_add_insecure_http2_port(args.server, args.addr); - args.cq = grpc_completion_queue_create(NULL); + args.cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_server_register_completion_queue(args.server, args.cq, NULL); grpc_server_start(args.server); gpr_thd_new(&server, server_thread, &args, &options); diff --git a/test/core/surface/lame_client_test.c b/test/core/surface/lame_client_test.c index 9deb50bb04..0cb183a4f0 100644 --- a/test/core/surface/lame_client_test.c +++ b/test/core/surface/lame_client_test.c @@ -108,7 +108,7 @@ int main(int argc, char **argv) { GPR_ASSERT(GRPC_CHANNEL_SHUTDOWN == grpc_channel_check_connectivity_state(chan, 0)); - cq = grpc_completion_queue_create(NULL); + cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_slice host = grpc_slice_from_static_string("anywhere"); call = grpc_channel_create_call(chan, NULL, GRPC_PROPAGATE_DEFAULTS, cq, diff --git a/test/core/surface/sequential_connectivity_test.c b/test/core/surface/sequential_connectivity_test.c index 5f66f90037..37678a40d9 100644 --- a/test/core/surface/sequential_connectivity_test.c +++ b/test/core/surface/sequential_connectivity_test.c @@ -76,7 +76,8 @@ static void run_test(const test_fixture *fixture) { grpc_server *server = grpc_server_create(NULL, NULL); fixture->add_server_port(server, addr); - grpc_completion_queue *server_cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *server_cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_server_register_completion_queue(server, server_cq, NULL); grpc_server_start(server); @@ -86,7 +87,8 @@ static void run_test(const test_fixture *fixture) { gpr_thd_options_set_joinable(&thdopt); gpr_thd_new(&server_thread, server_thread_func, &sta, &thdopt); - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_channel *channels[NUM_CONNECTIONS]; for (size_t i = 0; i < NUM_CONNECTIONS; i++) { channels[i] = fixture->create_channel(addr); diff --git a/test/core/surface/server_chttp2_test.c b/test/core/surface/server_chttp2_test.c index 6c178abdad..9fac63418f 100644 --- a/test/core/surface/server_chttp2_test.c +++ b/test/core/surface/server_chttp2_test.c @@ -60,7 +60,8 @@ void test_add_same_port_twice() { int port = grpc_pick_unused_port_or_die(); char *addr = NULL; - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); grpc_server *server = grpc_server_create(&args, NULL); grpc_server_credentials *fake_creds = grpc_fake_transport_security_server_credentials_create(); diff --git a/test/core/surface/server_test.c b/test/core/surface/server_test.c index 3fd1c2c266..aa54794d14 100644 --- a/test/core/surface/server_test.c +++ b/test/core/surface/server_test.c @@ -70,7 +70,8 @@ void test_register_method_fail(void) { } void test_request_call_on_no_server_cq(void) { - grpc_completion_queue *cc = grpc_completion_queue_create(NULL); + grpc_completion_queue *cc = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_server *server = grpc_server_create(NULL, NULL); GPR_ASSERT(GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE == grpc_server_request_call(server, NULL, NULL, NULL, cc, cc, NULL)); @@ -91,7 +92,8 @@ void test_bind_server_twice(void) { char *addr; grpc_server *server1 = grpc_server_create(&args, NULL); grpc_server *server2 = grpc_server_create(&args, NULL); - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); int port = grpc_pick_unused_port_or_die(); gpr_asprintf(&addr, "[::]:%d", port); grpc_server_register_completion_queue(server1, cq, NULL); @@ -128,7 +130,8 @@ void test_bind_server_to_addr(const char *host, bool secure) { } else { GPR_ASSERT(grpc_server_add_insecure_http2_port(server, addr)); } - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); grpc_server_register_completion_queue(server, cq, NULL); grpc_server_start(server); grpc_server_shutdown_and_notify(server, cq, NULL); diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc index 89ed9249ad..542f396d10 100644 --- a/test/cpp/grpclb/grpclb_test.cc +++ b/test/cpp/grpclb/grpclb_test.cc @@ -354,8 +354,9 @@ static void start_backend_server(server_fixture *sf) { } GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); const string expected_token = - strlen(sf->lb_token_prefix) == 0 ? "" : sf->lb_token_prefix + - std::to_string(sf->port); + strlen(sf->lb_token_prefix) == 0 + ? "" + : sf->lb_token_prefix + std::to_string(sf->port); GPR_ASSERT(contains_metadata(&request_metadata_recv, "lb-token", expected_token.c_str())); @@ -593,7 +594,7 @@ static void setup_client(const server_fixture *lb_server, grpc_channel_args_copy_and_add(NULL, &expected_target_arg, 1); gpr_free(expected_target_names); - cf->cq = grpc_completion_queue_create(NULL); + cf->cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); cf->server_uri = lb_uri; grpc_channel_credentials *fake_creds = grpc_fake_transport_security_credentials_create(); @@ -616,7 +617,7 @@ static void teardown_client(client_fixture *cf) { static void setup_server(const char *host, server_fixture *sf) { int assigned_port; - sf->cq = grpc_completion_queue_create(NULL); + sf->cq = grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); const char *colon_idx = strchr(host, ':'); if (colon_idx) { const char *port_str = colon_idx + 1; @@ -643,10 +644,15 @@ static void teardown_server(server_fixture *sf) { if (!sf->server) return; gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport); - grpc_server_shutdown_and_notify(sf->server, sf->cq, tag(1000)); - GPR_ASSERT(grpc_completion_queue_pluck( - sf->cq, tag(1000), grpc_timeout_seconds_to_deadline(5), NULL) + + grpc_completion_queue *shutdown_cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, NON_POLLING, NULL); + grpc_server_shutdown_and_notify(sf->server, shutdown_cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, tag(1000), + grpc_timeout_seconds_to_deadline(5), + NULL) .type == GRPC_OP_COMPLETE); + grpc_completion_queue_destroy(shutdown_cq); grpc_server_destroy(sf->server); gpr_thd_join(sf->tid); diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc index e2e5bbbe00..21e3374f91 100644 --- a/test/cpp/microbenchmarks/bm_call_create.cc +++ b/test/cpp/microbenchmarks/bm_call_create.cc @@ -90,7 +90,8 @@ class LameChannel : public BaseChannelFixture { template <class Fixture> static void BM_CallCreateDestroy(benchmark::State &state) { Fixture fixture; - grpc_completion_queue *cq = grpc_completion_queue_create(NULL); + grpc_completion_queue *cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); void *method_hdl = grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL); diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc index c017474bf4..c7bd8743a5 100644 --- a/test/cpp/microbenchmarks/bm_cq.cc +++ b/test/cpp/microbenchmarks/bm_cq.cc @@ -66,7 +66,10 @@ BENCHMARK(BM_CreateDestroyCpp); static void BM_CreateDestroyCore(benchmark::State& state) { while (state.KeepRunning()) { - grpc_completion_queue_destroy(grpc_completion_queue_create(NULL)); + // TODO: sreek Make this a templatized benchmark and pass completion type + // and polling type as parameters + grpc_completion_queue_destroy( + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL)); } } BENCHMARK(BM_CreateDestroyCore); @@ -98,7 +101,10 @@ static void BM_Pass1Cpp(benchmark::State& state) { BENCHMARK(BM_Pass1Cpp); static void BM_Pass1Core(benchmark::State& state) { - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); + // TODO: sreek Make this templatized benchmark and pass polling_type as a + // param + grpc_completion_queue* cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_cq_completion completion; @@ -114,7 +120,10 @@ static void BM_Pass1Core(benchmark::State& state) { BENCHMARK(BM_Pass1Core); static void BM_Pluck1Core(benchmark::State& state) { - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); + // TODO: sreek Make this templatized benchmark and pass polling_type as a + // param + grpc_completion_queue* cq = + grpc_completion_queue_create(GRPC_CQ_PLUCK, DEFAULT_POLLING, NULL); gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_cq_completion completion; @@ -130,7 +139,10 @@ static void BM_Pluck1Core(benchmark::State& state) { BENCHMARK(BM_Pluck1Core); static void BM_EmptyCore(benchmark::State& state) { - grpc_completion_queue* cq = grpc_completion_queue_create(NULL); + // TODO: sreek Make this a templatized benchmark and pass polling_type as a + // param + grpc_completion_queue* cq = + grpc_completion_queue_create(GRPC_CQ_NEXT, DEFAULT_POLLING, NULL); gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); while (state.KeepRunning()) { grpc_completion_queue_next(cq, deadline, NULL); |