aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Sree Kuchibhotla <sreek@google.com>2017-03-22 03:01:24 -0700
committerGravatar Sree Kuchibhotla <sreek@google.com>2017-03-22 03:01:24 -0700
commitf2c32150efe7d980882f1fb2c89c7d0db551cf6c (patch)
tree32f08d4d36bbd73d8e104f0b0700a9f1b0968979
parente2119ac8085abd9565ffb5a58ae42b469cf22c55 (diff)
Update C++ code
-rw-r--r--include/grpc++/impl/codegen/client_unary_call.h2
-rw-r--r--include/grpc++/impl/codegen/completion_queue.h19
-rw-r--r--include/grpc++/impl/codegen/core_codegen.h7
-rw-r--r--include/grpc++/impl/codegen/core_codegen_interface.h7
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h8
-rw-r--r--src/cpp/common/core_codegen.cc11
-rw-r--r--src/cpp/server/server_cc.cc5
-rw-r--r--test/cpp/grpclb/grpclb_test.cc13
-rw-r--r--test/cpp/microbenchmarks/bm_call_create.cc3
-rw-r--r--test/cpp/microbenchmarks/bm_cq.cc12
10 files changed, 44 insertions, 43 deletions
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h
index d8085a0a7f..6218e3ed1b 100644
--- a/include/grpc++/impl/codegen/client_unary_call.h
+++ b/include/grpc++/impl/codegen/client_unary_call.h
@@ -52,7 +52,7 @@ template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const InputMessage& request,
OutputMessage* result) {
- CompletionQueue cq(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING);
+ CompletionQueue cq(true); // Pluckable completion queue
Call call(channel->CreateCall(method, context, &cq));
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
index f34b82dad2..0130e9ca0f 100644
--- a/include/grpc++/impl/codegen/completion_queue.h
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -103,7 +103,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
public:
/// Default constructor. Implicitly creates a \a grpc_completion_queue
/// instance.
- CompletionQueue() : CompletionQueue(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING) {}
+ CompletionQueue() : CompletionQueue(false) {}
/// Wrap \a take, taking ownership of the instance.
///
@@ -147,8 +147,9 @@ class CompletionQueue : private GrpcLibraryCodegen {
///
/// \return true if read a regular event, false if the queue is shutting down.
bool Next(void** tag, bool* ok) {
- return (AsyncNextInternal(tag, ok, g_core_codegen_interface->gpr_inf_future(
- GPR_CLOCK_REALTIME)) != SHUTDOWN);
+ return (AsyncNextInternal(tag, ok,
+ g_core_codegen_interface->gpr_inf_future(
+ GPR_CLOCK_REALTIME)) != SHUTDOWN);
}
/// Request the shutdown of the queue.
@@ -217,10 +218,14 @@ class CompletionQueue : private GrpcLibraryCodegen {
OutputMessage* result);
/// Private constructor of CompletionQueue only visible to friend classes
- CompletionQueue(grpc_cq_completion_type completion_type,
- grpc_cq_polling_type polling_type) {
- cq_ = g_core_codegen_interface->grpc_completion_queue_create(
- completion_type, polling_type, nullptr);
+ CompletionQueue(bool is_pluck) {
+ if (is_pluck) {
+ cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_pluck(
+ nullptr);
+ } else {
+ cq_ = g_core_codegen_interface->grpc_completion_queue_create_for_next(
+ nullptr);
+ }
InitialAvalanching(); // reserve this for the future shutdown
}
diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h
index 4c8567f770..af7abdf898 100644
--- a/include/grpc++/impl/codegen/core_codegen.h
+++ b/include/grpc++/impl/codegen/core_codegen.h
@@ -46,9 +46,10 @@ namespace grpc {
/// Implementation of the core codegen interface.
class CoreCodegen : public CoreCodegenInterface {
private:
- grpc_completion_queue* grpc_completion_queue_create(
- grpc_cq_completion_type completion_type,
- grpc_cq_polling_type polling_type, void* reserved) override;
+ grpc_completion_queue* grpc_completion_queue_create_for_next(
+ void* reserved) override;
+ grpc_completion_queue* grpc_completion_queue_create_for_pluck(
+ void* reserved) override;
void grpc_completion_queue_destroy(grpc_completion_queue* cq) override;
grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
gpr_timespec deadline,
diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h
index 8f2b4a3b76..fd4767a80a 100644
--- a/include/grpc++/impl/codegen/core_codegen_interface.h
+++ b/include/grpc++/impl/codegen/core_codegen_interface.h
@@ -60,9 +60,10 @@ class CoreCodegenInterface {
virtual void assert_fail(const char* failed_assertion, const char* file,
int line) = 0;
- virtual grpc_completion_queue* grpc_completion_queue_create(
- grpc_cq_completion_type completion_type,
- grpc_cq_polling_type polling_type, void* reserved) = 0;
+ virtual grpc_completion_queue* grpc_completion_queue_create_for_next(
+ void* reserved) = 0;
+ virtual grpc_completion_queue* grpc_completion_queue_create_for_pluck(
+ void* reserved) = 0;
virtual void grpc_completion_queue_destroy(grpc_completion_queue* cq) = 0;
virtual grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq,
void* tag,
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index c09ab5e647..4e12f3261e 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -138,7 +138,7 @@ class ClientReader final : public ClientReaderInterface<R> {
ClientReader(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const W& request)
: context_(context),
- cq_(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING),
+ cq_(true), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
CallOpClientSendClose>
@@ -212,7 +212,7 @@ class ClientWriter : public ClientWriterInterface<W> {
ClientWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, R* response)
: context_(context),
- cq_(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING),
+ cq_(true), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
@@ -297,7 +297,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context)
: context_(context),
- cq_(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING),
+ cq_(true), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_)) {
CallOpSet<CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(context->send_initial_metadata_,
@@ -512,7 +512,7 @@ class ServerReaderWriterBody final {
Call* const call_;
ServerContext* const ctx_;
};
-}
+} // namespace internal
// class to represent the user API for a bidirectional streaming call
template <class W, class R>
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index 81b32938b8..7d8a733912 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -54,11 +54,14 @@ struct grpc_byte_buffer;
namespace grpc {
-grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
- grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_next(
void* reserved) {
- return ::grpc_completion_queue_create(completion_type, polling_type,
- reserved);
+ return ::grpc_completion_queue_create_for_next(reserved);
+}
+
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create_for_pluck(
+ void* reserved) {
+ return ::grpc_completion_queue_create_for_pluck(reserved);
}
void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) {
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index b11ea725e1..6adabef763 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -153,10 +153,7 @@ class Server::SyncRequest final : public CompletionQueueTag {
grpc_metadata_array_destroy(&request_metadata_);
}
- void SetupRequest() {
- cq_ = grpc_completion_queue_create(GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
- nullptr);
- }
+ void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
diff --git a/test/cpp/grpclb/grpclb_test.cc b/test/cpp/grpclb/grpclb_test.cc
index c9c569cfbe..b85ad949a5 100644
--- a/test/cpp/grpclb/grpclb_test.cc
+++ b/test/cpp/grpclb/grpclb_test.cc
@@ -354,8 +354,9 @@ static void start_backend_server(server_fixture *sf) {
}
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
const string expected_token =
- strlen(sf->lb_token_prefix) == 0 ? "" : sf->lb_token_prefix +
- std::to_string(sf->port);
+ strlen(sf->lb_token_prefix) == 0
+ ? ""
+ : sf->lb_token_prefix + std::to_string(sf->port);
GPR_ASSERT(contains_metadata(&request_metadata_recv, "lb-token",
expected_token.c_str()));
@@ -593,8 +594,7 @@ static void setup_client(const server_fixture *lb_server,
grpc_channel_args_copy_and_add(NULL, &expected_target_arg, 1);
gpr_free(expected_target_names);
- cf->cq =
- grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
+ cf->cq = grpc_completion_queue_create_for_next(NULL);
cf->server_uri = lb_uri;
grpc_channel_credentials *fake_creds =
grpc_fake_transport_security_credentials_create();
@@ -617,8 +617,7 @@ static void teardown_client(client_fixture *cf) {
static void setup_server(const char *host, server_fixture *sf) {
int assigned_port;
- sf->cq =
- grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
+ sf->cq = grpc_completion_queue_create_for_next(NULL);
const char *colon_idx = strchr(host, ':');
if (colon_idx) {
const char *port_str = colon_idx + 1;
@@ -647,7 +646,7 @@ static void teardown_server(server_fixture *sf) {
gpr_log(GPR_INFO, "Server[%s] shutting down", sf->servers_hostport);
grpc_completion_queue *shutdown_cq =
- grpc_completion_queue_create(GRPC_CQ_PLUCK, GRPC_CQ_NON_POLLING, NULL);
+ grpc_completion_queue_create_for_pluck(NULL);
grpc_server_shutdown_and_notify(sf->server, shutdown_cq, tag(1000));
GPR_ASSERT(grpc_completion_queue_pluck(shutdown_cq, tag(1000),
grpc_timeout_seconds_to_deadline(5),
diff --git a/test/cpp/microbenchmarks/bm_call_create.cc b/test/cpp/microbenchmarks/bm_call_create.cc
index ec6073e786..24d2a1b85f 100644
--- a/test/cpp/microbenchmarks/bm_call_create.cc
+++ b/test/cpp/microbenchmarks/bm_call_create.cc
@@ -114,8 +114,7 @@ template <class Fixture>
static void BM_CallCreateDestroy(benchmark::State &state) {
TrackCounters track_counters;
Fixture fixture;
- grpc_completion_queue *cq =
- grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
+ grpc_completion_queue *cq = grpc_completion_queue_create_for_next(NULL);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
void *method_hdl =
grpc_channel_register_call(fixture.channel(), "/foo/bar", NULL, NULL);
diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc
index 14eae82717..eaba7d858c 100644
--- a/test/cpp/microbenchmarks/bm_cq.cc
+++ b/test/cpp/microbenchmarks/bm_cq.cc
@@ -64,8 +64,7 @@ static void BM_CreateDestroyCore(benchmark::State& state) {
while (state.KeepRunning()) {
// TODO: sreek Templatize this benchmark and pass completion type and
// polling type as parameters
- grpc_completion_queue_destroy(grpc_completion_queue_create(
- GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL));
+ grpc_completion_queue_destroy(grpc_completion_queue_create_for_next(NULL));
}
track_counters.Finish(state);
}
@@ -102,8 +101,7 @@ BENCHMARK(BM_Pass1Cpp);
static void BM_Pass1Core(benchmark::State& state) {
TrackCounters track_counters;
// TODO: sreek Templatize this benchmark and pass polling_type as a param
- grpc_completion_queue* cq =
- grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
+ grpc_completion_queue* cq = grpc_completion_queue_create_for_next(NULL);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
while (state.KeepRunning()) {
grpc_cq_completion completion;
@@ -122,8 +120,7 @@ BENCHMARK(BM_Pass1Core);
static void BM_Pluck1Core(benchmark::State& state) {
TrackCounters track_counters;
// TODO: sreek Templatize this benchmark and pass polling_type as a param
- grpc_completion_queue* cq = grpc_completion_queue_create(
- GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, NULL);
+ grpc_completion_queue* cq = grpc_completion_queue_create_for_pluck(NULL);
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
while (state.KeepRunning()) {
grpc_cq_completion completion;
@@ -142,8 +139,7 @@ BENCHMARK(BM_Pluck1Core);
static void BM_EmptyCore(benchmark::State& state) {
TrackCounters track_counters;
// TODO: sreek Templatize this benchmark and pass polling_type as a param
- grpc_completion_queue* cq =
- grpc_completion_queue_create(GRPC_CQ_NEXT, GRPC_CQ_DEFAULT_POLLING, NULL);
+ grpc_completion_queue* cq = grpc_completion_queue_create_for_next(NULL);
gpr_timespec deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
while (state.KeepRunning()) {
grpc_completion_queue_next(cq, deadline, NULL);