From 1cd9aacab7896d701e9edd5e4988f99bff726868 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 18 Sep 2018 09:54:26 -0700 Subject: Add codegen support for client callback unary calls --- src/compiler/cpp_generator.cc | 147 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 1e0c36451b..ff0b40f587 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -128,6 +128,7 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file, ""); } static const char* headers_strs[] = { + "functional", "grpcpp/impl/codegen/async_generic_service.h", "grpcpp/impl/codegen/async_stream.h", "grpcpp/impl/codegen/async_unary_call.h", @@ -547,6 +548,115 @@ void PrintHeaderClientMethod(grpc_generator::Printer* printer, } } +void PrintHeaderClientMethodCallbackInterfacesStart( + grpc_generator::Printer* printer, + std::map* vars) { + // This declares the interface for the callback-based API. The components + // are pure; even though this is new (post-1.0) API, it can be pure because + // it is an entirely new interface that happens to be scoped within + // StubInterface, not new additions to StubInterface itself + printer->Print("class experimental_async_interface {\n"); + // All methods in this new interface are public. There is no need for private + // "Raw" methods since the callback-based API returns unowned raw pointers + printer->Print(" public:\n"); + printer->Indent(); +} + +void PrintHeaderClientMethodCallbackInterfaces( + grpc_generator::Printer* printer, const grpc_generator::Method* method, + std::map* vars, bool is_public) { + // Reserve is_public for future expansion + assert(is_public); + + (*vars)["Method"] = method->name(); + (*vars)["Request"] = method->input_type_name(); + (*vars)["Response"] = method->output_type_name(); + + if (method->NoStreaming()) { + printer->Print(*vars, + "virtual void $Method$(::grpc::ClientContext* context, " + "const $Request$* request, $Response$* response, " + "std::function) = 0;\n"); + } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add support for client-side streaming + } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add support for server-side streaming + } else if (method->BidiStreaming()) { + // TODO(vjpai): Add support for bidi streaming + } +} + +void PrintHeaderClientMethodCallbackInterfacesEnd( + grpc_generator::Printer* printer, + std::map* vars) { + printer->Outdent(); + printer->Print("};\n"); + + // Declare a function to give the async stub contents. It can't be pure + // since this is new API in StubInterface, but it is meaningless by default + // (since any stub that wants to use it must have its own implementation of + // the callback functions therein), so make the default return value nullptr. + // Intentionally include the word "class" to avoid possible shadowing. + printer->Print( + "virtual class experimental_async_interface* experimental_async() { " + "return nullptr; }\n"); +} + +void PrintHeaderClientMethodCallbackStart( + grpc_generator::Printer* printer, + std::map* vars) { + // This declares the stub entry for the callback-based API. + printer->Print("class experimental_async final :\n"); + printer->Print(" public StubInterface::experimental_async_interface {\n"); + printer->Print(" public:\n"); + printer->Indent(); +} + +void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer, + const grpc_generator::Method* method, + std::map* vars, + bool is_public) { + // Reserve is_public for future expansion + assert(is_public); + + (*vars)["Method"] = method->name(); + (*vars)["Request"] = method->input_type_name(); + (*vars)["Response"] = method->output_type_name(); + + if (method->NoStreaming()) { + printer->Print(*vars, + "void $Method$(::grpc::ClientContext* context, " + "const $Request$* request, $Response$* response, " + "std::function) override;\n"); + } else if (ClientOnlyStreaming(method)) { + // TODO(vjpai): Add support for client-side streaming + } else if (ServerOnlyStreaming(method)) { + // TODO(vjpai): Add support for server-side streaming + } else if (method->BidiStreaming()) { + // TODO(vjpai): Add support for bidi streaming + } +} + +void PrintHeaderClientMethodCallbackEnd( + grpc_generator::Printer* printer, + std::map* vars) { + printer->Outdent(); + printer->Print(" private:\n"); + printer->Indent(); + printer->Print("friend class Stub;\n"); + printer->Print("explicit experimental_async(Stub* stub): stub_(stub) { }\n"); + // include a function with a dummy use of stub_ to avoid an unused + // private member warning for service with no methods + printer->Print("Stub* stub() { return stub_; }\n"); + printer->Print("Stub* stub_;\n"); + printer->Outdent(); + printer->Print("};\n"); + + printer->Print( + "class experimental_async_interface* experimental_async() override { " + "return &async_stub_; }\n"); +} + void PrintHeaderClientMethodData(grpc_generator::Printer* printer, const grpc_generator::Method* method, std::map* vars) { @@ -951,6 +1061,14 @@ void PrintHeaderService(grpc_generator::Printer* printer, true); printer->Print(service->method(i)->GetTrailingComments("//").c_str()); } + PrintHeaderClientMethodCallbackInterfacesStart(printer, vars); + for (int i = 0; i < service->method_count(); ++i) { + printer->Print(service->method(i)->GetLeadingComments("//").c_str()); + PrintHeaderClientMethodCallbackInterfaces(printer, service->method(i).get(), + vars, true); + printer->Print(service->method(i)->GetTrailingComments("//").c_str()); + } + PrintHeaderClientMethodCallbackInterfacesEnd(printer, vars); printer->Outdent(); printer->Print("private:\n"); printer->Indent(); @@ -970,10 +1088,17 @@ void PrintHeaderService(grpc_generator::Printer* printer, for (int i = 0; i < service->method_count(); ++i) { PrintHeaderClientMethod(printer, service->method(i).get(), vars, true); } + PrintHeaderClientMethodCallbackStart(printer, vars); + for (int i = 0; i < service->method_count(); ++i) { + PrintHeaderClientMethodCallback(printer, service->method(i).get(), vars, + true); + } + PrintHeaderClientMethodCallbackEnd(printer, vars); printer->Outdent(); printer->Print("\n private:\n"); printer->Indent(); printer->Print("std::shared_ptr< ::grpc::ChannelInterface> channel_;\n"); + printer->Print("class experimental_async async_stub_{this};\n"); for (int i = 0; i < service->method_count(); ++i) { PrintHeaderClientMethod(printer, service->method(i).get(), vars, false); } @@ -1199,10 +1324,12 @@ grpc::string GetSourceIncludes(grpc_generator::File* file, std::map vars; static const char* headers_strs[] = { + "functional", "grpcpp/impl/codegen/async_stream.h", "grpcpp/impl/codegen/async_unary_call.h", "grpcpp/impl/codegen/channel_interface.h", "grpcpp/impl/codegen/client_unary_call.h", + "grpcpp/impl/codegen/client_callback.h", "grpcpp/impl/codegen/method_handler_impl.h", "grpcpp/impl/codegen/rpc_service_method.h", "grpcpp/impl/codegen/service_type.h", @@ -1247,6 +1374,17 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, " return ::grpc::internal::BlockingUnaryCall" "(channel_.get(), rpcmethod_$Method$_, " "context, request, response);\n}\n\n"); + + printer->Print(*vars, + "void $ns$$Service$::Stub::experimental_async::$Method$(" + "::grpc::ClientContext* context, " + "const $Request$* request, $Response$* response, " + "std::function f) {\n"); + printer->Print(*vars, + " return ::grpc::internal::CallbackUnaryCall" + "(stub_->channel_.get(), stub_->rpcmethod_$Method$_, " + "context, request, response, std::move(f));\n}\n\n"); + for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1277,6 +1415,9 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "rpcmethod_$Method$_, " "context, response);\n" "}\n\n"); + + // TODO(vjpai): Add callback version + for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1308,6 +1449,9 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "rpcmethod_$Method$_, " "context, request);\n" "}\n\n"); + + // TODO(vjpai): Add callback version + for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; @@ -1339,6 +1483,9 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "rpcmethod_$Method$_, " "context);\n" "}\n\n"); + + // TODO(vjpai): Add callback version + for (auto async_prefix : async_prefixes) { (*vars)["AsyncPrefix"] = async_prefix.prefix; (*vars)["AsyncStart"] = async_prefix.start; -- cgit v1.2.3 From c5afb25905965e3f9213eac72d3ab62640eb28f6 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 18 Sep 2018 09:57:17 -0700 Subject: fix golden file --- test/cpp/codegen/compiler_test_golden | 48 +++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index 756f6a0224..c679880763 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -26,6 +26,7 @@ #include "src/proto/grpc/testing/compiler_test.pb.h" +#include #include #include #include @@ -105,6 +106,22 @@ class ServiceA final { return std::unique_ptr< ::grpc::ClientAsyncReaderWriterInterface< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq)); } // Method A4 trailing comment 1 + class experimental_async_interface { + public: + // MethodA1 leading comment 1 + virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) = 0; + // MethodA1 trailing comment 1 + // MethodA2 detached leading comment 1 + // + // Method A2 leading comment 1 + // Method A2 leading comment 2 + // MethodA2 trailing comment 1 + // Method A3 leading comment 1 + // Method A3 trailing comment 1 + // Method A4 leading comment 1 + // Method A4 trailing comment 1 + }; + virtual class experimental_async_interface* experimental_async() { return nullptr; } private: virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; @@ -155,9 +172,21 @@ class ServiceA final { std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>> PrepareAsyncMethodA4(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>>(PrepareAsyncMethodA4Raw(context, cq)); } + class experimental_async final : + public StubInterface::experimental_async_interface { + public: + void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) override; + private: + friend class Stub; + explicit experimental_async(Stub* stub): stub_(stub) { } + Stub* stub() { return stub_; } + Stub* stub_; + }; + class experimental_async_interface* experimental_async() override { return &async_stub_; } private: std::shared_ptr< ::grpc::ChannelInterface> channel_; + class experimental_async async_stub_{this}; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodA1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientWriter< ::grpc::testing::Request>* MethodA2Raw(::grpc::ClientContext* context, ::grpc::testing::Response* response) override; @@ -488,6 +517,13 @@ class ServiceB final { return std::unique_ptr< ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq)); } // MethodB1 trailing comment 1 + class experimental_async_interface { + public: + // MethodB1 leading comment 1 + virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) = 0; + // MethodB1 trailing comment 1 + }; + virtual class experimental_async_interface* experimental_async() { return nullptr; } private: virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; virtual ::grpc::ClientAsyncResponseReaderInterface< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) = 0; @@ -502,9 +538,21 @@ class ServiceB final { std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>> PrepareAsyncMethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) { return std::unique_ptr< ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>>(PrepareAsyncMethodB1Raw(context, request, cq)); } + class experimental_async final : + public StubInterface::experimental_async_interface { + public: + void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) override; + private: + friend class Stub; + explicit experimental_async(Stub* stub): stub_(stub) { } + Stub* stub() { return stub_; } + Stub* stub_; + }; + class experimental_async_interface* experimental_async() override { return &async_stub_; } private: std::shared_ptr< ::grpc::ChannelInterface> channel_; + class experimental_async async_stub_{this}; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; ::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override; const ::grpc::internal::RpcMethod rpcmethod_MethodB1_; -- cgit v1.2.3 From 42a0ed43cba18b0c9dc1d181ad50b9f2e0b89e69 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 18 Sep 2018 12:04:31 -0700 Subject: Add a test --- test/cpp/end2end/client_callback_end2end_test.cc | 44 ++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 75b896b33d..29893b451c 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -55,7 +55,8 @@ class ClientCallbackEnd2endTest : public ::testing::Test { void ResetStub() { ChannelArguments args; channel_ = server_->InProcessChannel(args); - stub_.reset(new GenericStub(channel_)); + stub_ = grpc::testing::EchoTestService::NewStub(channel_); + generic_stub_.reset(new GenericStub(channel_)); } void TearDown() override { @@ -65,6 +66,36 @@ class ClientCallbackEnd2endTest : public ::testing::Test { } void SendRpcs(int num_rpcs) { + grpc::string test_string(""); + for (int i = 0; i < num_rpcs; i++) { + EchoRequest request; + EchoResponse response; + ClientContext cli_ctx; + + test_string += "Hello world. "; + request.set_message(test_string); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->Echo( + &cli_ctx, &request, &response, + [&request, &response, &done, &mu, &cv](Status s) { + GPR_ASSERT(s.ok()); + + EXPECT_EQ(request.message(), response.message()); + std::lock_guard l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock l(mu); + while (!done) { + cv.wait(l); + } + } + } + + void SendRpcsGeneric(int num_rpcs) { const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); grpc::string test_string(""); for (int i = 0; i < num_rpcs; i++) { @@ -80,7 +111,7 @@ class ClientCallbackEnd2endTest : public ::testing::Test { std::mutex mu; std::condition_variable cv; bool done = false; - stub_->experimental().UnaryCall( + generic_stub_->experimental().UnaryCall( &cli_ctx, kMethodName, send_buf.get(), &recv_buf, [&request, &recv_buf, &done, &mu, &cv](Status s) { GPR_ASSERT(s.ok()); @@ -98,9 +129,11 @@ class ClientCallbackEnd2endTest : public ::testing::Test { } } } + bool is_server_started_; std::shared_ptr channel_; - std::unique_ptr stub_; + std::unique_ptr stub_; + std::unique_ptr generic_stub_; TestServiceImpl service_; std::unique_ptr server_; }; @@ -115,6 +148,11 @@ TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) { SendRpcs(10); } +TEST_F(ClientCallbackEnd2endTest, SequentialGenericRpcs) { + ResetStub(); + SendRpcsGeneric(10); +} + } // namespace } // namespace testing } // namespace grpc -- cgit v1.2.3 From 47ae48e20c9700eb2b5e5e26037bdef401ea7f16 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 18 Sep 2018 12:10:33 -0700 Subject: Fix conflict --- test/cpp/end2end/client_callback_end2end_test.cc | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 77ff0a5c95..90c8845643 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -65,7 +65,6 @@ class ClientCallbackEnd2endTest : public ::testing::Test { } } -<<<<<<< HEAD void SendRpcs(int num_rpcs) { grpc::string test_string(""); for (int i = 0; i < num_rpcs; i++) { @@ -96,10 +95,7 @@ class ClientCallbackEnd2endTest : public ::testing::Test { } } - void SendRpcsGeneric(int num_rpcs) { -======= - void SendRpcs(int num_rpcs, bool maybe_except) { ->>>>>>> master + void SendRpcsGeneric(int num_rpcs, bool maybe_except) { const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo"); grpc::string test_string(""); for (int i = 0; i < num_rpcs; i++) { @@ -149,23 +145,23 @@ class ClientCallbackEnd2endTest : public ::testing::Test { TEST_F(ClientCallbackEnd2endTest, SimpleRpc) { ResetStub(); - SendRpcs(1, false); + SendRpcs(1); } TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) { ResetStub(); - SendRpcs(10, false); + SendRpcs(10); } TEST_F(ClientCallbackEnd2endTest, SequentialGenericRpcs) { ResetStub(); - SendRpcsGeneric(10); + SendRpcsGeneric(10, false); } #if GRPC_ALLOW_EXCEPTIONS TEST_F(ClientCallbackEnd2endTest, ExceptingRpc) { ResetStub(); - SendRpcs(10, true); + SendRpcsGeneric(10, true); } #endif -- cgit v1.2.3 From 845bc9ae217a99562fdff2f97a6e984446e601df Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 18 Sep 2018 16:48:46 -0700 Subject: Add more test cases --- test/cpp/end2end/client_callback_end2end_test.cc | 69 ++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/test/cpp/end2end/client_callback_end2end_test.cc b/test/cpp/end2end/client_callback_end2end_test.cc index 90c8845643..e99703c30f 100644 --- a/test/cpp/end2end/client_callback_end2end_test.cc +++ b/test/cpp/end2end/client_callback_end2end_test.cc @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -65,7 +66,7 @@ class ClientCallbackEnd2endTest : public ::testing::Test { } } - void SendRpcs(int num_rpcs) { + void SendRpcs(int num_rpcs, bool with_binary_metadata) { grpc::string test_string(""); for (int i = 0; i < num_rpcs; i++) { EchoRequest request; @@ -75,6 +76,14 @@ class ClientCallbackEnd2endTest : public ::testing::Test { test_string += "Hello world. "; request.set_message(test_string); + if (with_binary_metadata) { + char bytes[8] = {'\0', '\1', '\2', '\3', + '\4', '\5', '\6', static_cast(i)}; + cli_ctx.AddMetadata("custom-bin", grpc::string(bytes, 8)); + } + + cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); + std::mutex mu; std::condition_variable cv; bool done = false; @@ -145,12 +154,17 @@ class ClientCallbackEnd2endTest : public ::testing::Test { TEST_F(ClientCallbackEnd2endTest, SimpleRpc) { ResetStub(); - SendRpcs(1); + SendRpcs(1, false); } TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) { ResetStub(); - SendRpcs(10); + SendRpcs(10, false); +} + +TEST_F(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) { + ResetStub(); + SendRpcs(10, true); } TEST_F(ClientCallbackEnd2endTest, SequentialGenericRpcs) { @@ -165,6 +179,55 @@ TEST_F(ClientCallbackEnd2endTest, ExceptingRpc) { } #endif +TEST_F(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { + ResetStub(); + std::vector threads; + threads.reserve(10); + for (int i = 0; i < 10; ++i) { + threads.emplace_back([this] { SendRpcs(10, true); }); + } + for (int i = 0; i < 10; ++i) { + threads[i].join(); + } +} + +TEST_F(ClientCallbackEnd2endTest, MultipleRpcs) { + ResetStub(); + std::vector threads; + threads.reserve(10); + for (int i = 0; i < 10; ++i) { + threads.emplace_back([this] { SendRpcs(10, false); }); + } + for (int i = 0; i < 10; ++i) { + threads[i].join(); + } +} + +TEST_F(ClientCallbackEnd2endTest, CancelRpcBeforeStart) { + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.TryCancel(); + + std::mutex mu; + std::condition_variable cv; + bool done = false; + stub_->experimental_async()->Echo( + &context, &request, &response, [&response, &done, &mu, &cv](Status s) { + EXPECT_EQ("", response.message()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + std::lock_guard l(mu); + done = true; + cv.notify_one(); + }); + std::unique_lock l(mu); + while (!done) { + cv.wait(l); + } +} + } // namespace } // namespace testing } // namespace grpc -- cgit v1.2.3 From 9e6511ae2eb3c982bcea88096cbe079147b25fa4 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Wed, 19 Sep 2018 23:51:24 -0700 Subject: Make the core callback interface API so that it can be used in generated code --- BUILD | 1 - CMakeLists.txt | 3 - Makefile | 3 - build.yaml | 1 - gRPC-C++.podspec | 1 - grpc.gyp | 2 - include/grpc/grpc.h | 3 +- include/grpc/impl/codegen/grpc_types.h | 15 ++- include/grpcpp/impl/codegen/callback_common.h | 92 ++++++++++--- include/grpcpp/impl/codegen/client_callback.h | 2 +- src/core/lib/surface/completion_queue.cc | 34 ++--- src/core/lib/surface/completion_queue.h | 19 +-- src/core/lib/surface/completion_queue_factory.cc | 6 +- src/cpp/client/channel_cc.cc | 10 +- src/cpp/common/callback_common.cc | 149 --------------------- src/ruby/ext/grpc/rb_grpc_imports.generated.h | 2 +- test/core/end2end/inproc_callback_test.cc | 31 +++-- test/core/surface/completion_queue_test.cc | 25 ++-- tools/doxygen/Doxyfile.c++.internal | 1 - tools/run_tests/generated/sources_and_headers.json | 1 - 20 files changed, 156 insertions(+), 245 deletions(-) delete mode 100644 src/cpp/common/callback_common.cc diff --git a/BUILD b/BUILD index 271e57e36c..5b4c5e4130 100644 --- a/BUILD +++ b/BUILD @@ -119,7 +119,6 @@ GRPCXX_SRCS = [ "src/cpp/client/credentials_cc.cc", "src/cpp/client/generic_stub.cc", "src/cpp/common/alarm.cc", - "src/cpp/common/callback_common.cc", "src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_filter.cc", "src/cpp/common/completion_queue_cc.cc", diff --git a/CMakeLists.txt b/CMakeLists.txt index c358e9bd43..4f260c65ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2773,7 +2773,6 @@ add_library(grpc++ src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc - src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -3135,7 +3134,6 @@ add_library(grpc++_cronet src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc - src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc @@ -4261,7 +4259,6 @@ add_library(grpc++_unsecure src/cpp/client/credentials_cc.cc src/cpp/client/generic_stub.cc src/cpp/common/alarm.cc - src/cpp/common/callback_common.cc src/cpp/common/channel_arguments.cc src/cpp/common/channel_filter.cc src/cpp/common/completion_queue_cc.cc diff --git a/Makefile b/Makefile index 2f2537228c..79d10f94d7 100644 --- a/Makefile +++ b/Makefile @@ -5228,7 +5228,6 @@ LIBGRPC++_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ - src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ @@ -5598,7 +5597,6 @@ LIBGRPC++_CRONET_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ - src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ @@ -6682,7 +6680,6 @@ LIBGRPC++_UNSECURE_SRC = \ src/cpp/client/credentials_cc.cc \ src/cpp/client/generic_stub.cc \ src/cpp/common/alarm.cc \ - src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/completion_queue_cc.cc \ diff --git a/build.yaml b/build.yaml index d6e67aa7ee..c8512186d5 100644 --- a/build.yaml +++ b/build.yaml @@ -1327,7 +1327,6 @@ filegroups: - src/cpp/client/credentials_cc.cc - src/cpp/client/generic_stub.cc - src/cpp/common/alarm.cc - - src/cpp/common/callback_common.cc - src/cpp/common/channel_arguments.cc - src/cpp/common/channel_filter.cc - src/cpp/common/completion_queue_cc.cc diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 03ec223279..4b75865751 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -190,7 +190,6 @@ Pod::Spec.new do |s| 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', - 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', diff --git a/grpc.gyp b/grpc.gyp index b8aae44de3..654a531092 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -1381,7 +1381,6 @@ 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', - 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', @@ -1529,7 +1528,6 @@ 'src/cpp/client/credentials_cc.cc', 'src/cpp/client/generic_stub.cc', 'src/cpp/common/alarm.cc', - 'src/cpp/common/callback_common.cc', 'src/cpp/common/channel_arguments.cc', 'src/cpp/common/channel_filter.cc', 'src/cpp/common/completion_queue_cc.cc', diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 3ef95ff462..787d6ae6d7 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -111,7 +111,8 @@ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_pluck( of GRPC_CQ_CALLBACK and grpc_cq_polling_type of GRPC_CQ_DEFAULT_POLLING. This function is experimental. */ GRPCAPI grpc_completion_queue* grpc_completion_queue_create_for_callback( - void* shutdown_callback, void* reserved); + grpc_experimental_completion_queue_functor* shutdown_callback, + void* reserved); /** Create a completion queue */ GRPCAPI grpc_completion_queue* grpc_completion_queue_create( diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 5f3b96f40b..5bd50bc9ac 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -660,6 +660,19 @@ typedef enum { GRPC_CQ_CALLBACK } grpc_cq_completion_type; +/** EXPERIMENTAL: Specifies an interface class to be used as a tag + for callback-based completion queues. This can be used directly, + as the first element of a struct in C, or as a base class in C++. + Its "run" value should be assigned to some non-member function, such as + a static method. */ +typedef struct grpc_experimental_completion_queue_functor { + /** The run member specifies a function that will be called when this + tag is extracted from the completion queue. Its arguments will be a + pointer to this functor and a boolean that indicates whether the + success status of this operation */ + void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int); +} grpc_experimental_completion_queue_functor; + /* The upgrade to version 2 is currently experimental. */ #define GRPC_CQ_CURRENT_VERSION 2 @@ -678,7 +691,7 @@ typedef struct grpc_completion_queue_attributes { /* EXPERIMENTAL: START OF VERSION 2 CQ ATTRIBUTES */ /** When creating a callbackable CQ, pass in a functor to get invoked when * shutdown is complete */ - void* cq_shutdown_cb; + grpc_experimental_completion_queue_functor* cq_shutdown_cb; /* END OF VERSION 2 CQ ATTRIBUTES */ } grpc_completion_queue_attributes; diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 8b3ad66a8d..044045034c 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -21,25 +21,36 @@ #include +#include #include #include #include #include #include -// Forward declarations -namespace grpc_core { -class CQCallbackInterface; -}; - namespace grpc { namespace internal { +/// An exception-safe way of invoking a user-specified callback function +template +void CatchingCallback(Func&& func, Arg&& arg) { +#if GRPC_ALLOW_EXCEPTIONS + try { + func(arg); + } catch (...) { + // nothing to return or change here, just don't crash the library + } +#else // GRPC_ALLOW_EXCEPTIONS + func(arg); +#endif // GRPC_ALLOW_EXCEPTIONS +} + // The contract on these tags is that they are single-shot. They must be // constructed and then fired at exactly one point. There is no expectation // that they can be reused without reconstruction. -class CallbackWithStatusTag { +class CallbackWithStatusTag + : public grpc_experimental_completion_queue_functor { public: // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -54,24 +65,48 @@ class CallbackWithStatusTag { static void operator delete(void*, void*) { assert(0); } CallbackWithStatusTag(grpc_call* call, std::function f, - CompletionQueueTag* ops); + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops), status_() { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithStatusTag::StaticRun; + } ~CallbackWithStatusTag() {} - void* tag() { return static_cast(impl_); } - Status* status_ptr() { return status_; } - CompletionQueueTag* ops() { return ops_; } + Status* status_ptr() { return &status_; } // force_run can not be performed on a tag if operations using this tag // have been sent to PerformOpsOnCall. It is intended for error conditions // that are detected before the operations are internally processed. - void force_run(Status s); + void force_run(Status s) { + status_ = std::move(s); + Run(true); + } private: - grpc_core::CQCallbackInterface* impl_; - Status* status_; + grpc_call* call_; + std::function func_; CompletionQueueTag* ops_; + Status status_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast(cb)->Run(static_cast(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + + GPR_ASSERT(ops_->FinalizeResult(&ignored, &ok)); + GPR_ASSERT(ignored == ops_); + + // Last use of func_ or status_, so ok to move them out + CatchingCallback(std::move(func_), std::move(status_)); + + func_ = nullptr; // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } }; -class CallbackWithSuccessTag { +class CallbackWithSuccessTag + : public grpc_experimental_completion_queue_functor { public: // always allocated against a call arena, no memory free required static void operator delete(void* ptr, std::size_t size) { @@ -86,19 +121,40 @@ class CallbackWithSuccessTag { static void operator delete(void*, void*) { assert(0); } CallbackWithSuccessTag(grpc_call* call, std::function f, - CompletionQueueTag* ops); + CompletionQueueTag* ops) + : call_(call), func_(std::move(f)), ops_(ops) { + g_core_codegen_interface->grpc_call_ref(call); + functor_run = &CallbackWithSuccessTag::StaticRun; + } - void* tag() { return static_cast(impl_); } CompletionQueueTag* ops() { return ops_; } // force_run can not be performed on a tag if operations using this tag // have been sent to PerformOpsOnCall. It is intended for error conditions // that are detected before the operations are internally processed. - void force_run(bool ok); + void force_run(bool ok) { Run(ok); } private: - grpc_core::CQCallbackInterface* impl_; + grpc_call* call_; + std::function func_; CompletionQueueTag* ops_; + + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + static_cast(cb)->Run(static_cast(ok)); + } + void Run(bool ok) { + void* ignored = ops_; + bool new_ok = ok; + GPR_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + GPR_ASSERT(ignored == ops_); + + // Last use of func_, so ok to move it out for rvalue call above + CatchingCallback(std::move(func_), ok); + + func_ = nullptr; // reset to clear this out for sure + g_core_codegen_interface->grpc_call_unref(call_); + } }; } // namespace internal diff --git a/include/grpcpp/impl/codegen/client_callback.h b/include/grpcpp/impl/codegen/client_callback.h index fc81c8aa0a..4d4faea063 100644 --- a/include/grpcpp/impl/codegen/client_callback.h +++ b/include/grpcpp/impl/codegen/client_callback.h @@ -84,7 +84,7 @@ class CallbackUnaryCallImpl { ops->AllowNoMessage(); ops->ClientSendClose(); ops->ClientRecvStatus(context, tag->status_ptr()); - ops->set_cq_tag(tag->tag()); + ops->set_cq_tag(tag); call.PerformOps(ops); } }; diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index c2cf450e94..01797d493a 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -184,7 +184,8 @@ static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { typedef struct cq_vtable { grpc_cq_completion_type cq_completion_type; size_t data_size; - void (*init)(void* data, grpc_core::CQCallbackInterface* shutdown_callback); + void (*init)(void* data, + grpc_experimental_completion_queue_functor* shutdown_callback); void (*shutdown)(grpc_completion_queue* cq); void (*destroy)(void* data); bool (*begin_op)(grpc_completion_queue* cq, void* tag); @@ -267,7 +268,7 @@ typedef struct cq_callback_data { bool shutdown_called; /** A callback that gets invoked when the CQ completes shutdown */ - grpc_core::CQCallbackInterface* shutdown_callback; + grpc_experimental_completion_queue_functor* shutdown_callback; } cq_callback_data; /* Completion queue structure */ @@ -333,12 +334,12 @@ static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag, gpr_timespec deadline, void* reserved); // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); -static void cq_init_callback(void* data, - grpc_core::CQCallbackInterface* shutdown_callback); +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); +static void cq_init_callback( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback); static void cq_destroy_next(void* data); static void cq_destroy_pluck(void* data); static void cq_destroy_callback(void* data); @@ -462,7 +463,7 @@ static long cq_event_queue_num_items(grpc_cq_event_queue* q) { grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, - grpc_core::CQCallbackInterface* shutdown_callback) { + grpc_experimental_completion_queue_functor* shutdown_callback) { GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0); grpc_completion_queue* cq; @@ -497,8 +498,8 @@ grpc_completion_queue* grpc_completion_queue_create_internal( return cq; } -static void cq_init_next(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_next( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_next_data* cqd = static_cast(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -513,8 +514,8 @@ static void cq_destroy_next(void* data) { cq_event_queue_destroy(&cqd->queue); } -static void cq_init_pluck(void* data, - grpc_core::CQCallbackInterface* shutdown_callback) { +static void cq_init_pluck( + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_pluck_data* cqd = static_cast(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -532,7 +533,7 @@ static void cq_destroy_pluck(void* data) { } static void cq_init_callback( - void* data, grpc_core::CQCallbackInterface* shutdown_callback) { + void* data, grpc_experimental_completion_queue_functor* shutdown_callback) { cq_callback_data* cqd = static_cast(data); /* Initial count is dropped by grpc_completion_queue_shutdown */ gpr_atm_no_barrier_store(&cqd->pending_events, 1); @@ -859,7 +860,8 @@ static void cq_end_op_for_callback( GRPC_ERROR_UNREF(error); - (static_cast(tag))->Run(is_success); + auto* functor = static_cast(tag); + (*functor->functor_run)(functor, is_success); } void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error, @@ -1343,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - callback->Run(true); + callback->functor_run(callback, true); } static void cq_shutdown_callback(grpc_completion_queue* cq) { diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index a7c524d8e8..d60fe6d6ef 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -48,23 +48,6 @@ typedef struct grpc_cq_completion { uintptr_t next; } grpc_cq_completion; -/// For callback CQs, the tag that is passed in for an operation must -/// actually be a pointer to an implementation of the following class. -/// When the operation completes, the tag will be typecasted from void* -/// to grpc_core::CQCallbackInterface* and then the Run method will be -/// invoked on it. In practice, the language binding (e.g., C++ API -/// implementation) is responsible for providing and using an implementation -/// of this abstract base class. -namespace grpc_core { -class CQCallbackInterface { - public: - virtual ~CQCallbackInterface() {} - virtual void Run(bool) GRPC_ABSTRACT; - - GRPC_ABSTRACT_BASE_CLASS -}; -} // namespace grpc_core - #ifndef NDEBUG void grpc_cq_internal_ref(grpc_completion_queue* cc, const char* reason, const char* file, int line); @@ -106,6 +89,6 @@ int grpc_get_cq_poll_num(grpc_completion_queue* cc); grpc_completion_queue* grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type, - grpc_core::CQCallbackInterface* shutdown_callback); + grpc_experimental_completion_queue_functor* shutdown_callback); #endif /* GRPC_CORE_LIB_SURFACE_COMPLETION_QUEUE_H */ diff --git a/src/core/lib/surface/completion_queue_factory.cc b/src/core/lib/surface/completion_queue_factory.cc index ed92dd7eba..2616c156e4 100644 --- a/src/core/lib/surface/completion_queue_factory.cc +++ b/src/core/lib/surface/completion_queue_factory.cc @@ -31,8 +31,7 @@ static grpc_completion_queue* default_create( const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attr) { return grpc_completion_queue_create_internal( - attr->cq_completion_type, attr->cq_polling_type, - static_cast(attr->cq_shutdown_cb)); + attr->cq_completion_type, attr->cq_polling_type, attr->cq_shutdown_cb); } static grpc_completion_queue_factory_vtable default_vtable = {default_create}; @@ -73,7 +72,8 @@ grpc_completion_queue* grpc_completion_queue_create_for_pluck(void* reserved) { } grpc_completion_queue* grpc_completion_queue_create_for_callback( - void* shutdown_callback, void* reserved) { + grpc_experimental_completion_queue_functor* shutdown_callback, + void* reserved) { GPR_ASSERT(!reserved); grpc_completion_queue_attributes attr = { 2, GRPC_CQ_CALLBACK, GRPC_CQ_DEFAULT_POLLING, shutdown_callback}; diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc index ad71286e05..c59059f045 100644 --- a/src/cpp/client/channel_cc.cc +++ b/src/cpp/client/channel_cc.cc @@ -193,17 +193,19 @@ bool Channel::WaitForStateChangeImpl(grpc_connectivity_state last_observed, } namespace { -class ShutdownCallback : public grpc_core::CQCallbackInterface { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: + ShutdownCallback() { functor_run = &ShutdownCallback::Run; } // TakeCQ takes ownership of the cq into the shutdown callback // so that the shutdown callback will be responsible for destroying it void TakeCQ(CompletionQueue* cq) { cq_ = cq; } // The Run function will get invoked by the completion queue library // when the shutdown is actually complete - void Run(bool) override { - delete cq_; - grpc_core::Delete(this); + static void Run(grpc_experimental_completion_queue_functor* cb, int) { + auto* callback = static_cast(cb); + delete callback->cq_; + grpc_core::Delete(callback); } private: diff --git a/src/cpp/common/callback_common.cc b/src/cpp/common/callback_common.cc deleted file mode 100644 index a0c8eeb516..0000000000 --- a/src/cpp/common/callback_common.cc +++ /dev/null @@ -1,149 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include -#include - -#include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/surface/completion_queue.h" - -namespace grpc { -namespace internal { -namespace { - -template -void CatchingCallback(Func&& func, Arg&& arg) { -#if GRPC_ALLOW_EXCEPTIONS - try { - func(arg); - } catch (...) { - // nothing to return or change here, just don't crash the library - } -#else // GRPC_ALLOW_EXCEPTIONS - func(arg); -#endif // GRPC_ALLOW_EXCEPTIONS -} - -class CallbackWithSuccessImpl : public grpc_core::CQCallbackInterface { - public: - static void operator delete(void* ptr, std::size_t size) { - assert(size == sizeof(CallbackWithSuccessImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - - CallbackWithSuccessImpl(grpc_call* call, CallbackWithSuccessTag* parent, - std::function f) - : call_(call), parent_(parent), func_(std::move(f)) { - grpc_call_ref(call); - } - - void Run(bool ok) override { - void* ignored = parent_->ops(); - bool new_ok = ok; - GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &new_ok)); - GPR_ASSERT(ignored == parent_->ops()); - - // Last use of func_, so ok to move it out for rvalue call above - CatchingCallback(std::move(func_), ok); - - func_ = nullptr; // reset to clear this out for sure - grpc_call_unref(call_); - } - - private: - grpc_call* call_; - CallbackWithSuccessTag* parent_; - std::function func_; -}; - -class CallbackWithStatusImpl : public grpc_core::CQCallbackInterface { - public: - static void operator delete(void* ptr, std::size_t size) { - assert(size == sizeof(CallbackWithStatusImpl)); - } - - // This operator should never be called as the memory should be freed as part - // of the arena destruction. It only exists to provide a matching operator - // delete to the operator new so that some compilers will not complain (see - // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this - // there are no tests catching the compiler warning. - static void operator delete(void*, void*) { assert(0); } - - CallbackWithStatusImpl(grpc_call* call, CallbackWithStatusTag* parent, - std::function f) - : call_(call), parent_(parent), func_(std::move(f)), status_() { - grpc_call_ref(call); - } - - void Run(bool ok) override { - void* ignored = parent_->ops(); - - GPR_ASSERT(parent_->ops()->FinalizeResult(&ignored, &ok)); - GPR_ASSERT(ignored == parent_->ops()); - - // Last use of func_ or status_, so ok to move them out - CatchingCallback(std::move(func_), std::move(status_)); - - func_ = nullptr; // reset to clear this out for sure - grpc_call_unref(call_); - } - Status* status_ptr() { return &status_; } - - private: - grpc_call* call_; - CallbackWithStatusTag* parent_; - std::function func_; - Status status_; -}; - -} // namespace - -CallbackWithSuccessTag::CallbackWithSuccessTag(grpc_call* call, - std::function f, - CompletionQueueTag* ops) - : impl_(new (grpc_call_arena_alloc(call, sizeof(CallbackWithSuccessImpl))) - CallbackWithSuccessImpl(call, this, std::move(f))), - ops_(ops) {} - -void CallbackWithSuccessTag::force_run(bool ok) { impl_->Run(ok); } - -CallbackWithStatusTag::CallbackWithStatusTag(grpc_call* call, - std::function f, - CompletionQueueTag* ops) - : ops_(ops) { - auto* impl = new (grpc_call_arena_alloc(call, sizeof(CallbackWithStatusImpl))) - CallbackWithStatusImpl(call, this, std::move(f)); - impl_ = impl; - status_ = impl->status_ptr(); -} - -void CallbackWithStatusTag::force_run(Status s) { - *status_ = std::move(s); - impl_->Run(true); -} - -} // namespace internal -} // namespace grpc diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h index d00e75c326..e25d953a18 100644 --- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h +++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h @@ -107,7 +107,7 @@ extern grpc_completion_queue_create_for_next_type grpc_completion_queue_create_f typedef grpc_completion_queue*(*grpc_completion_queue_create_for_pluck_type)(void* reserved); extern grpc_completion_queue_create_for_pluck_type grpc_completion_queue_create_for_pluck_import; #define grpc_completion_queue_create_for_pluck grpc_completion_queue_create_for_pluck_import -typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(void* shutdown_callback, void* reserved); +typedef grpc_completion_queue*(*grpc_completion_queue_create_for_callback_type)(grpc_experimental_completion_queue_functor* shutdown_callback, void* reserved); extern grpc_completion_queue_create_for_callback_type grpc_completion_queue_create_for_callback_import; #define grpc_completion_queue_create_for_callback grpc_completion_queue_create_for_callback_import typedef grpc_completion_queue*(*grpc_completion_queue_create_type)(const grpc_completion_queue_factory* factory, const grpc_completion_queue_attributes* attributes, void* reserved); diff --git a/test/core/end2end/inproc_callback_test.cc b/test/core/end2end/inproc_callback_test.cc index 0d6c7c75a8..310030046a 100644 --- a/test/core/end2end/inproc_callback_test.cc +++ b/test/core/end2end/inproc_callback_test.cc @@ -37,13 +37,16 @@ typedef struct inproc_fixture_data { namespace { template -class CQDeletingCallback : public grpc_core::CQCallbackInterface { +class CQDeletingCallback : public grpc_experimental_completion_queue_functor { public: - explicit CQDeletingCallback(F f) : func_(f) {} - ~CQDeletingCallback() override {} - void Run(bool ok) override { - func_(ok); - grpc_core::Delete(this); + explicit CQDeletingCallback(F f) : func_(f) { + functor_run = &CQDeletingCallback::Run; + } + ~CQDeletingCallback() {} + static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + auto* callback = static_cast(cb); + callback->func_(static_cast(ok)); + grpc_core::Delete(callback); } private: @@ -51,18 +54,24 @@ class CQDeletingCallback : public grpc_core::CQCallbackInterface { }; template -grpc_core::CQCallbackInterface* NewDeletingCallback(F f) { +grpc_experimental_completion_queue_functor* NewDeletingCallback(F f) { return grpc_core::New>(f); } -class ShutdownCallback : public grpc_core::CQCallbackInterface { +class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: ShutdownCallback() : done_(false) { + functor_run = &ShutdownCallback::StaticRun; gpr_mu_init(&mu_); gpr_cv_init(&cv_); } - ~ShutdownCallback() override {} - void Run(bool ok) override { + ~ShutdownCallback() {} + static void StaticRun(grpc_experimental_completion_queue_functor* cb, + int ok) { + auto* callback = static_cast(cb); + callback->Run(static_cast(ok)); + } + void Run(bool ok) { gpr_log(GPR_DEBUG, "CQ shutdown notification invoked"); gpr_mu_lock(&mu_); done_ = true; @@ -170,7 +179,7 @@ static void verify_tags(gpr_timespec deadline) { // This function creates a callback functor that emits the // desired tag into the global tag set -static grpc_core::CQCallbackInterface* tag(intptr_t t) { +static grpc_experimental_completion_queue_functor* tag(intptr_t t) { auto func = [t](bool ok) { gpr_mu_lock(&tags_mu); gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t); diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc index b889fd0fc6..f7ce8a7042 100644 --- a/test/core/surface/completion_queue_test.cc +++ b/test/core/surface/completion_queue_test.cc @@ -369,11 +369,15 @@ static void test_callback(void) { LOG_TEST("test_callback"); bool got_shutdown = false; - class ShutdownCallback : public grpc_core::CQCallbackInterface { + class ShutdownCallback : public grpc_experimental_completion_queue_functor { public: - ShutdownCallback(bool* done) : done_(done) {} + ShutdownCallback(bool* done) : done_(done) { + functor_run = &ShutdownCallback::Run; + } ~ShutdownCallback() {} - void Run(bool ok) override { *done_ = ok; } + static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + *static_cast(cb)->done_ = static_cast(ok); + } private: bool* done_; @@ -391,14 +395,17 @@ static void test_callback(void) { grpc_completion_queue_factory_lookup(&attr), &attr, nullptr); int counter = 0; - class TagCallback : public grpc_core::CQCallbackInterface { + class TagCallback : public grpc_experimental_completion_queue_functor { public: - TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {} + TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) { + functor_run = &TagCallback::Run; + } ~TagCallback() {} - void Run(bool ok) override { - GPR_ASSERT(ok); - *counter_ += tag_; - grpc_core::Delete(this); + static void Run(grpc_experimental_completion_queue_functor* cb, int ok) { + GPR_ASSERT(static_cast(ok)); + auto* callback = static_cast(cb); + *callback->counter_ += callback->tag_; + grpc_core::Delete(callback); }; private: diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index a72390d9f8..cfc5ef98e7 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1190,7 +1190,6 @@ src/cpp/client/secure_credentials.h \ src/cpp/codegen/codegen_init.cc \ src/cpp/common/alarm.cc \ src/cpp/common/auth_property_iterator.cc \ -src/cpp/common/callback_common.cc \ src/cpp/common/channel_arguments.cc \ src/cpp/common/channel_filter.cc \ src/cpp/common/channel_filter.h \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index f3e93a0874..97f3e4c1fa 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -11470,7 +11470,6 @@ "src/cpp/client/credentials_cc.cc", "src/cpp/client/generic_stub.cc", "src/cpp/common/alarm.cc", - "src/cpp/common/callback_common.cc", "src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_filter.cc", "src/cpp/common/channel_filter.h", -- cgit v1.2.3 From aee8271fe34d358041df72a554c844f4fc4d7185 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 20 Sep 2018 00:08:54 -0700 Subject: Fix a Status, and resolve reviewer comments --- src/compiler/cpp_generator.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index ff0b40f587..0acd211009 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -593,7 +593,7 @@ void PrintHeaderClientMethodCallbackInterfacesEnd( printer->Print("};\n"); // Declare a function to give the async stub contents. It can't be pure - // since this is new API in StubInterface, but it is meaningless by default + // since this is a new API in StubInterface, but it is meaningless by default // (since any stub that wants to use it must have its own implementation of // the callback functions therein), so make the default return value nullptr. // Intentionally include the word "class" to avoid possible shadowing. @@ -1379,7 +1379,7 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer, "void $ns$$Service$::Stub::experimental_async::$Method$(" "::grpc::ClientContext* context, " "const $Request$* request, $Response$* response, " - "std::function f) {\n"); + "std::function f) {\n"); printer->Print(*vars, " return ::grpc::internal::CallbackUnaryCall" "(stub_->channel_.get(), stub_->rpcmethod_$Method$_, " -- cgit v1.2.3 From fc1e35444b311d74e878a45ae4c8120687fa01b5 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 20 Sep 2018 00:39:49 -0700 Subject: Reset status field in tag as well as func --- include/grpcpp/impl/codegen/callback_common.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index 044045034c..ce0d451b60 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -100,7 +100,8 @@ class CallbackWithStatusTag // Last use of func_ or status_, so ok to move them out CatchingCallback(std::move(func_), std::move(status_)); - func_ = nullptr; // reset to clear this out for sure + func_ = nullptr; // reset to clear this out for sure + status_ = Status(); // reset to clear this out for sure g_core_codegen_interface->grpc_call_unref(call_); } }; -- cgit v1.2.3 From 0db69018b0ac0d58bd4b17fdafdacff3b1a2fc69 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 20 Sep 2018 00:58:29 -0700 Subject: Use GPR_CODEGEN_ASSERT in impl/codegen --- include/grpcpp/impl/codegen/callback_common.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index ce0d451b60..ab96241ce8 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -94,8 +94,8 @@ class CallbackWithStatusTag void Run(bool ok) { void* ignored = ops_; - GPR_ASSERT(ops_->FinalizeResult(&ignored, &ok)); - GPR_ASSERT(ignored == ops_); + GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &ok)); + GPR_CODEGEN_ASSERT(ignored == ops_); // Last use of func_ or status_, so ok to move them out CatchingCallback(std::move(func_), std::move(status_)); @@ -147,8 +147,8 @@ class CallbackWithSuccessTag void Run(bool ok) { void* ignored = ops_; bool new_ok = ok; - GPR_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); - GPR_ASSERT(ignored == ops_); + GPR_CODEGEN_ASSERT(ops_->FinalizeResult(&ignored, &new_ok)); + GPR_CODEGEN_ASSERT(ignored == ops_); // Last use of func_, so ok to move it out for rvalue call above CatchingCallback(std::move(func_), ok); -- cgit v1.2.3 From 74fc60e9aff49023f52db6b9e467bbc64dc551d1 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 20 Sep 2018 10:20:30 -0700 Subject: Make our C function pointer use consistent --- src/core/lib/surface/completion_queue.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 01797d493a..5dc9991f70 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -1345,7 +1345,7 @@ static void cq_finish_shutdown_callback(grpc_completion_queue* cq) { GPR_ASSERT(cqd->shutdown_called); cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done); - callback->functor_run(callback, true); + (*callback->functor_run)(callback, true); } static void cq_shutdown_callback(grpc_completion_queue* cq) { -- cgit v1.2.3 From 17fc4d4029e0525e8cb0a25686e078f5842b3644 Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 20 Sep 2018 10:49:00 -0700 Subject: Address reviewer comments --- include/grpc/impl/codegen/grpc_types.h | 2 +- include/grpcpp/impl/codegen/callback_common.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index 5bd50bc9ac..9ed5b3c1d4 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -669,7 +669,7 @@ typedef struct grpc_experimental_completion_queue_functor { /** The run member specifies a function that will be called when this tag is extracted from the completion queue. Its arguments will be a pointer to this functor and a boolean that indicates whether the - success status of this operation */ + operation succeeded (non-zero) or failed (zero) */ void (*functor_run)(struct grpc_experimental_completion_queue_functor*, int); } grpc_experimental_completion_queue_functor; diff --git a/include/grpcpp/impl/codegen/callback_common.h b/include/grpcpp/impl/codegen/callback_common.h index ab96241ce8..ca2f867d04 100644 --- a/include/grpcpp/impl/codegen/callback_common.h +++ b/include/grpcpp/impl/codegen/callback_common.h @@ -66,7 +66,7 @@ class CallbackWithStatusTag CallbackWithStatusTag(grpc_call* call, std::function f, CompletionQueueTag* ops) - : call_(call), func_(std::move(f)), ops_(ops), status_() { + : call_(call), func_(std::move(f)), ops_(ops) { g_core_codegen_interface->grpc_call_ref(call); functor_run = &CallbackWithStatusTag::StaticRun; } -- cgit v1.2.3 From da1b75b5d5a9e9d437886d324f61241808fe05ab Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Thu, 20 Sep 2018 12:06:55 -0700 Subject: Add a virtual destructor to the new class with virtual methods --- src/compiler/cpp_generator.cc | 1 + test/cpp/codegen/compiler_test_golden | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index 0acd211009..56716493dc 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -560,6 +560,7 @@ void PrintHeaderClientMethodCallbackInterfacesStart( // "Raw" methods since the callback-based API returns unowned raw pointers printer->Print(" public:\n"); printer->Indent(); + printer->Print("virtual ~experimental_async_interface() {}\n"); } void PrintHeaderClientMethodCallbackInterfaces( diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden index c679880763..93e1e68654 100644 --- a/test/cpp/codegen/compiler_test_golden +++ b/test/cpp/codegen/compiler_test_golden @@ -108,6 +108,7 @@ class ServiceA final { // Method A4 trailing comment 1 class experimental_async_interface { public: + virtual ~experimental_async_interface() {} // MethodA1 leading comment 1 virtual void MethodA1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) = 0; // MethodA1 trailing comment 1 @@ -519,6 +520,7 @@ class ServiceB final { // MethodB1 trailing comment 1 class experimental_async_interface { public: + virtual ~experimental_async_interface() {} // MethodB1 leading comment 1 virtual void MethodB1(::grpc::ClientContext* context, const ::grpc::testing::Request* request, ::grpc::testing::Response* response, std::function) = 0; // MethodB1 trailing comment 1 -- cgit v1.2.3