aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/impl/codegen/client_unary_call.h4
-rw-r--r--include/grpc++/impl/codegen/completion_queue.h32
-rw-r--r--include/grpc++/impl/codegen/core_codegen.h9
-rw-r--r--include/grpc++/impl/codegen/core_codegen_interface.h6
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h12
-rw-r--r--include/grpc/grpc.h23
-rw-r--r--include/grpc/impl/codegen/grpc_types.h23
-rw-r--r--src/core/lib/surface/completion_queue.c21
-rw-r--r--src/core/lib/surface/server.c7
-rw-r--r--src/cpp/common/core_codegen.cc12
-rw-r--r--src/cpp/server/server_builder.cc26
-rw-r--r--test/cpp/end2end/async_end2end_test.cc28
12 files changed, 134 insertions, 69 deletions
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h
index a5a4f3d739..4bf35ae778 100644
--- a/include/grpc++/impl/codegen/client_unary_call.h
+++ b/include/grpc++/impl/codegen/client_unary_call.h
@@ -52,7 +52,9 @@ template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const InputMessage& request,
OutputMessage* result) {
- CompletionQueue cq(true); // Pluckable completion queue
+ CompletionQueue cq(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}); // Pluckable completion queue
Call call(channel->CreateCall(method, context, &cq));
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
index 906a64693e..c8ab726b0f 100644
--- a/include/grpc++/impl/codegen/completion_queue.h
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -102,7 +102,9 @@ class CompletionQueue : private GrpcLibraryCodegen {
public:
/// Default constructor. Implicitly creates a \a grpc_completion_queue
/// instance.
- CompletionQueue() : CompletionQueue(false) {}
+ CompletionQueue()
+ : CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING}) {}
/// Wrap \a take, taking ownership of the instance.
///
@@ -182,6 +184,16 @@ class CompletionQueue : private GrpcLibraryCodegen {
};
void CompleteAvalanching();
+ protected:
+ /// Private constructor of CompletionQueue only visible to friend classes
+ CompletionQueue(const grpc_completion_queue_attributes& attributes) {
+ cq_ = g_core_codegen_interface->grpc_completion_queue_create(
+ g_core_codegen_interface->grpc_completion_queue_factory_lookup(
+ &attributes),
+ &attributes, NULL);
+ InitialAvalanching(); // reserve this for the future shutdown
+ }
+
private:
// Friend synchronous wrappers so that they can access Pluck(), which is
// a semi-private API geared towards the synchronous implementation.
@@ -215,18 +227,6 @@ class CompletionQueue : private GrpcLibraryCodegen {
const InputMessage& request,
OutputMessage* result);
- /// Private constructor of CompletionQueue only visible to friend classes
- CompletionQueue(bool is_pluck) {
- if (is_pluck) {
- cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_pluck(
- nullptr);
- } else {
- cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_next(
- nullptr);
- }
- InitialAvalanching(); // reserve this for the future shutdown
- }
-
NextStatus AsyncNextInternal(void** tag, bool* ok, gpr_timespec deadline);
/// Wraps \a grpc_completion_queue_pluck.
@@ -299,11 +299,9 @@ class ServerCompletionQueue : public CompletionQueue {
/// AsyncNext()). By default all server completion queues are assumed to be
/// frequently polled.
ServerCompletionQueue(grpc_cq_polling_type polling_type)
- : CompletionQueue(MakeCompletionQueue(polling_type)),
+ : CompletionQueue(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_NEXT, polling_type}),
polling_type_(polling_type) {}
-
- static grpc_completion_queue* MakeCompletionQueue(
- grpc_cq_polling_type polling_type);
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h
index 65151590b2..3cb7da8ef6 100644
--- a/include/grpc++/impl/codegen/core_codegen.h
+++ b/include/grpc++/impl/codegen/core_codegen.h
@@ -44,8 +44,15 @@
namespace grpc {
/// Implementation of the core codegen interface.
-class CoreCodegen : public CoreCodegenInterface {
+class CoreCodegen final : public CoreCodegenInterface {
private:
+ virtual const grpc_completion_queue_factory*
+ grpc_completion_queue_factory_lookup(
+ const grpc_completion_queue_attributes* attributes) override;
+ virtual grpc_completion_queue* grpc_completion_queue_create(
+ const grpc_completion_queue_factory* factory,
+ const grpc_completion_queue_attributes* attributes,
+ void* reserved) override;
grpc_completion_queue* grpc_completion_queue_create_for_next(
void* reserved) override;
grpc_completion_queue* grpc_completion_queue_create_for_pluck(
diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h
index 529bef687b..a1a0aaf3ca 100644
--- a/include/grpc++/impl/codegen/core_codegen_interface.h
+++ b/include/grpc++/impl/codegen/core_codegen_interface.h
@@ -59,6 +59,12 @@ class CoreCodegenInterface {
virtual void assert_fail(const char* failed_assertion, const char* file,
int line) = 0;
+ virtual const grpc_completion_queue_factory*
+ grpc_completion_queue_factory_lookup(
+ const grpc_completion_queue_attributes* attributes) = 0;
+ virtual grpc_completion_queue* grpc_completion_queue_create(
+ const grpc_completion_queue_factory* factory,
+ const grpc_completion_queue_attributes* attributes, void* reserved) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create_for_next(
void* reserved) = 0;
virtual grpc_completion_queue* grpc_completion_queue_create_for_pluck(
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index 328d5cb1e8..a010924cef 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -156,7 +156,9 @@ class ClientReader final : public ClientReaderInterface<R> {
ClientReader(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const W& request)
: context_(context),
- cq_(true), // Pluckable cq
+ cq_(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose>
@@ -230,7 +232,9 @@ class ClientWriter : public ClientWriterInterface<W> {
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, R* response)
: context_(context),
- cq_(true), // Pluckable cq
+ cq_(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
@@ -330,7 +334,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context),
- cq_(true), // Pluckable cq
+ cq_(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
if (!context_->initial_metadata_corked_) {
CallOpSet<CallOpSendInitialMetadata> ops;
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index f3201edad2..1a7d0120bf 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -93,29 +93,6 @@ GRPCAPI const char *grpc_version_string(void);
/** Return a string specifying what the 'g' in gRPC stands for */
GRPCAPI const char *grpc_g_stands_for(void);
-/** Specifies the type of APIs to use to pop events from the completion queue */
-typedef enum {
- /** Events are popped out by calling grpc_completion_queue_next() API ONLY */
- GRPC_CQ_NEXT = 1,
-
- /** Events are popped out by calling grpc_completion_queue_pluck() API ONLY*/
- GRPC_CQ_PLUCK
-} grpc_cq_completion_type;
-
-#define GRPC_CQ_CURRENT_VERSION 1
-typedef struct grpc_completion_queue_attributes {
- /* The version number of this structure. More fields might be added to this
- structure in future. */
- int version; /* Set to GRPC_CQ_CURRENT_VERSION */
-
- grpc_cq_completion_type cq_completion_type;
-
- grpc_cq_polling_type cq_polling_type;
-} grpc_completion_queue_attributes;
-
-/** The completion queue factory structure is opaque to the callers of grpc */
-typedef struct grpc_completion_queue_factory grpc_completion_queue_factory;
-
/** Returns the completion queue factory based on the attributes. MAY return a
NULL if no factory can be found */
GRPCAPI const grpc_completion_queue_factory *
diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h
index 02732205f5..a8eda739bf 100644
--- a/include/grpc/impl/codegen/grpc_types.h
+++ b/include/grpc/impl/codegen/grpc_types.h
@@ -569,6 +569,29 @@ typedef enum {
GRPC_CQ_NON_POLLING
} grpc_cq_polling_type;
+/** Specifies the type of APIs to use to pop events from the completion queue */
+typedef enum {
+ /** Events are popped out by calling grpc_completion_queue_next() API ONLY */
+ GRPC_CQ_NEXT = 1,
+
+ /** Events are popped out by calling grpc_completion_queue_pluck() API ONLY*/
+ GRPC_CQ_PLUCK
+} grpc_cq_completion_type;
+
+#define GRPC_CQ_CURRENT_VERSION 1
+typedef struct grpc_completion_queue_attributes {
+ /* The version number of this structure. More fields might be added to this
+ structure in future. */
+ int version; /* Set to GRPC_CQ_CURRENT_VERSION */
+
+ grpc_cq_completion_type cq_completion_type;
+
+ grpc_cq_polling_type cq_polling_type;
+} grpc_completion_queue_attributes;
+
+/** The completion queue factory structure is opaque to the callers of grpc */
+typedef struct grpc_completion_queue_factory grpc_completion_queue_factory;
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index e17f094837..ea97a6f374 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -61,6 +61,7 @@ typedef struct {
} plucker;
typedef struct {
+ bool can_get_pollset;
size_t (*size)(void);
void (*init)(grpc_pollset *pollset, gpr_mu **mu);
grpc_error *(*kick)(grpc_pollset *pollset,
@@ -107,9 +108,10 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
gpr_timespec now,
gpr_timespec deadline) {
non_polling_poller *npp = (non_polling_poller *)pollset;
+ if (npp->shutdown) return GRPC_ERROR_NONE;
non_polling_worker w;
gpr_cv_init(&w.cv);
- *worker = (grpc_pollset_worker *)&w;
+ if (worker != NULL) *worker = (grpc_pollset_worker *)&w;
if (npp->root == NULL) {
npp->root = w.next = w.prev = &w;
} else {
@@ -128,11 +130,11 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx,
}
npp->root = NULL;
}
- w.next->prev = w.prev;
- w.prev->next = w.next;
}
+ w.next->prev = w.prev;
+ w.prev->next = w.next;
gpr_cv_destroy(&w.cv);
- *worker = NULL;
+ if (worker != NULL) *worker = NULL;
return GRPC_ERROR_NONE;
}
@@ -169,21 +171,24 @@ static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx,
static const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
/* GRPC_CQ_DEFAULT_POLLING */
- {.size = grpc_pollset_size,
+ {.can_get_pollset = true,
+ .size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
.work = grpc_pollset_work,
.shutdown = grpc_pollset_shutdown,
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_LISTENING */
- {.size = grpc_pollset_size,
+ {.can_get_pollset = true,
+ .size = grpc_pollset_size,
.init = grpc_pollset_init,
.kick = grpc_pollset_kick,
.work = grpc_pollset_work,
.shutdown = grpc_pollset_shutdown,
.destroy = grpc_pollset_destroy},
/* GRPC_CQ_NON_POLLING */
- {.size = non_polling_poller_size,
+ {.can_get_pollset = false,
+ .size = non_polling_poller_size,
.init = non_polling_poller_init,
.kick = non_polling_poller_kick,
.work = non_polling_poller_work,
@@ -837,7 +842,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
}
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
- return POLLSET_FROM_CQ(cc);
+ return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL;
}
grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) {
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 9496f90390..767c91a5ec 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1009,6 +1009,8 @@ void grpc_server_register_completion_queue(grpc_server *server,
calls grpc_completion_queue_pluck() on server completion queues */
}
+ GPR_ASSERT(grpc_cq_pollset(cq));
+
register_completion_queue(server, cq, false, reserved);
}
@@ -1102,8 +1104,9 @@ void grpc_server_start(grpc_server *server) {
gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count);
for (i = 0; i < server->cq_count; i++) {
if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) {
- server->pollsets[server->pollset_count++] =
- grpc_cq_pollset(server->cqs[i]);
+ grpc_pollset *pollset = grpc_cq_pollset(server->cqs[i]);
+ GPR_ASSERT(pollset);
+ server->pollsets[server->pollset_count++] = pollset;
}
server->request_freelist_per_cq[i] =
gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq);
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index 0dd758ec4e..8f1de222fb 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -54,6 +54,18 @@ struct grpc_byte_buffer;
namespace grpc {
+const grpc_completion_queue_factory*
+CoreCodegen::grpc_completion_queue_factory_lookup(
+ const grpc_completion_queue_attributes* attributes) {
+ return ::grpc_completion_queue_factory_lookup(attributes);
+}
+
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
+ const grpc_completion_queue_factory* factory,
+ const grpc_completion_queue_attributes* attributes, void* reserved) {
+ return ::grpc_completion_queue_create(factory, attributes, reserved);
+}
+
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next(
void* reserved) {
return ::grpc_completion_queue_create_for_next(reserved);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index c6784ea159..6687fe78b9 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -243,6 +243,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_cqs(std::make_shared<
std::vector<std::unique_ptr<ServerCompletionQueue>>>());
+ int num_frequently_polled_cqs = 0;
+ for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
+ if ((*it)->IsFrequentlyPolled()) {
+ num_frequently_polled_cqs++;
+ }
+ }
+
+ const bool is_hybrid_server =
+ has_sync_methods && num_frequently_polled_cqs > 0;
+
if (has_sync_methods) {
// This is a Sync server
gpr_log(GPR_INFO,
@@ -253,7 +263,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_settings_.cq_timeout_msec);
grpc_cq_polling_type polling_type =
- cqs_.empty() ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_POLLING;
+ is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
// Create completion queues to listen to incoming rpc requests
for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
@@ -273,12 +283,15 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
// server
// 2. cqs_: Completion queues added via AddCompletionQueue() call
- // All sync cqs (if any) are frequently polled by ThreadManager
- int num_frequently_polled_cqs = sync_server_cqs->size();
-
for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
- grpc_server_register_completion_queue(server->server_, (*it)->cq(),
- nullptr);
+ if (is_hybrid_server) {
+ grpc_server_register_non_listening_completion_queue(server->server_,
+ (*it)->cq(), nullptr);
+ } else {
+ grpc_server_register_completion_queue(server->server_, (*it)->cq(),
+ nullptr);
+ }
+ num_frequently_polled_cqs++;
}
// cqs_ contains the completion queue added by calling the ServerBuilder's
@@ -290,7 +303,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
if ((*it)->IsFrequentlyPolled()) {
grpc_server_register_completion_queue(server->server_, (*it)->cq(),
nullptr);
- num_frequently_polled_cqs++;
} else {
grpc_server_register_non_listening_completion_queue(server->server_,
(*it)->cq(), nullptr);
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 0b5215ef8e..cc3958bf13 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -38,6 +38,7 @@
#include <grpc++/channel.h>
#include <grpc++/client_context.h>
#include <grpc++/create_channel.h>
+#include <grpc++/ext/health_check_service_server_builder_option.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
@@ -49,6 +50,7 @@
#include <gtest/gtest.h>
#include "src/core/lib/iomgr/port.h"
+#include "src/proto/grpc/health/v1/health.grpc.pb.h"
#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
#include "test/core/util/port.h"
@@ -224,13 +226,15 @@ class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
class TestScenario {
public:
- TestScenario(bool non_block, const grpc::string& creds_type,
+ TestScenario(bool non_block, const grpc::string& creds_type, bool hcs,
const grpc::string& content)
: disable_blocking(non_block),
+ health_check_service(hcs),
credentials_type(creds_type),
message_content(content) {}
void Log() const;
bool disable_blocking;
+ bool health_check_service;
// Although the below grpc::string's are logically const, we can't declare
// them const because of a limitation in the way old compilers (e.g., gcc-4.4)
// manage vector insertion using a copy constructor
@@ -243,6 +247,8 @@ static std::ostream& operator<<(std::ostream& out,
return out << "TestScenario{disable_blocking="
<< (scenario.disable_blocking ? "true" : "false")
<< ", credentials='" << scenario.credentials_type
+ << ", health_check_service="
+ << (scenario.health_check_service ? "true" : "false")
<< "', message_size=" << scenario.message_content.size() << "}";
}
@@ -252,6 +258,8 @@ void TestScenario::Log() const {
gpr_log(GPR_DEBUG, "%s", out.str().c_str());
}
+class HealthCheck : public health::v1::Health::Service {};
+
class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
protected:
AsyncEnd2endTest() { GetParam().Log(); }
@@ -268,6 +276,9 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
GetParam().credentials_type);
builder.AddListeningPort(server_address_.str(), server_creds);
builder.RegisterService(&service_);
+ if (GetParam().health_check_service) {
+ builder.RegisterService(&health_check_);
+ }
cq_ = builder.AddCompletionQueue();
// TODO(zyc): make a test option to choose wheather sync plugins should be
@@ -340,6 +351,7 @@ class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
std::unique_ptr<Server> server_;
grpc::testing::EchoTestService::AsyncService service_;
+ HealthCheck health_check_;
std::ostringstream server_address_;
int port_;
@@ -1754,12 +1766,14 @@ std::vector<TestScenario> CreateTestScenarios(bool test_disable_blocking,
messages.push_back(big_msg);
}
- for (auto cred = credentials_types.begin(); cred != credentials_types.end();
- ++cred) {
- for (auto msg = messages.begin(); msg != messages.end(); msg++) {
- scenarios.emplace_back(false, *cred, *msg);
- if (test_disable_blocking) {
- scenarios.emplace_back(true, *cred, *msg);
+ for (auto health_check_service : {false, true}) {
+ for (auto cred = credentials_types.begin(); cred != credentials_types.end();
+ ++cred) {
+ for (auto msg = messages.begin(); msg != messages.end(); msg++) {
+ scenarios.emplace_back(false, *cred, health_check_service, *msg);
+ if (test_disable_blocking) {
+ scenarios.emplace_back(true, *cred, health_check_service, *msg);
+ }
}
}
}