From ec0bc8b4ed4760ff0ab1e51d505f1b235fc9d60d Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 15 Jun 2016 14:02:57 -0700 Subject: Initial attempt at a C++ API for defining channel filters. --- src/cpp/common/channel_filter.cc | 98 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 src/cpp/common/channel_filter.cc (limited to 'src') diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc new file mode 100644 index 0000000000..9a80195e86 --- /dev/null +++ b/src/cpp/common/channel_filter.cc @@ -0,0 +1,98 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#include "src/core/lib/channel/channel_stack.h" + +namespace grpc { + +// +// CallData +// + +void CallData::StartTransportStreamOp( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op *op) { + grpc_call_next_op(exec_ctx, elem, op); +} + +void CallData::SetPollsetOrPollsetSet( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_polling_entity *pollent) { + grpc_call_stack_ignore_set_pollset_or_pollset_set(exec_ctx, elem, pollent); +} + +char* CallData::GetPeer( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + return grpc_call_next_get_peer(exec_ctx, elem); +} + +// +// ChannelData +// + +void ChannelData::StartTransportOp( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_transport_op *op) { + grpc_channel_next_op(exec_ctx, elem, op); +} + +// +// RegisterChannelFilter() +// + +namespace internal { + +std::vector* channel_filters = nullptr; + +namespace { + +bool AppendFilter(grpc_channel_stack_builder* builder, void* arg) { + return grpc_channel_stack_builder_append_filter( + builder, (const grpc_channel_filter *)arg, nullptr, nullptr); +} + +} // namespace + +void ChannelFilterPluginInit() { + for (size_t i = 0; i < channel_filters->size(); ++i) { + FilterRecord& filter = (*channel_filters)[i]; + grpc_channel_init_register_stage(filter.stack_type, filter.priority, + AppendFilter, (void*)&filter.filter); + } +} + +} // namespace internal + +} // namespace grpc -- cgit v1.2.3 From c459ecf7c9beac5e861ade0dd4686348c1bbfdc6 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 16 Jun 2016 09:17:49 -0700 Subject: - fix build problems - make changes suggested by reviewer - add test (not working yet) --- Makefile | 47 +++ build.yaml | 13 + include/grpc++/channel_filter.h | 68 ++--- src/core/lib/channel/channel_stack.h | 8 + src/core/lib/channel/channel_stack_builder.h | 8 + src/core/lib/surface/channel_init.h | 8 + src/cpp/common/channel_filter.cc | 17 +- test/cpp/end2end/filter_end2end_test.cc | 327 +++++++++++++++++++++ tools/run_tests/sources_and_headers.json | 18 ++ tools/run_tests/tests.json | 21 ++ .../filter_end2end_test.vcxproj | 207 +++++++++++++ .../filter_end2end_test.vcxproj.filters | 21 ++ 12 files changed, 721 insertions(+), 42 deletions(-) create mode 100644 test/cpp/end2end/filter_end2end_test.cc create mode 100644 vsprojects/vcxproj/test/filter_end2end_test/filter_end2end_test.vcxproj create mode 100644 vsprojects/vcxproj/test/filter_end2end_test/filter_end2end_test.vcxproj.filters (limited to 'src') diff --git a/Makefile b/Makefile index 2f88411c95..2556b74920 100644 --- a/Makefile +++ b/Makefile @@ -1014,6 +1014,7 @@ cxx_slice_test: $(BINDIR)/$(CONFIG)/cxx_slice_test cxx_string_ref_test: $(BINDIR)/$(CONFIG)/cxx_string_ref_test cxx_time_test: $(BINDIR)/$(CONFIG)/cxx_time_test end2end_test: $(BINDIR)/$(CONFIG)/end2end_test +filter_end2end_test: $(BINDIR)/$(CONFIG)/filter_end2end_test generic_end2end_test: $(BINDIR)/$(CONFIG)/generic_end2end_test golden_file_test: $(BINDIR)/$(CONFIG)/golden_file_test grpc_cli: $(BINDIR)/$(CONFIG)/grpc_cli @@ -1393,6 +1394,7 @@ buildtests_cxx: buildtests_zookeeper privatelibs_cxx \ $(BINDIR)/$(CONFIG)/cxx_string_ref_test \ $(BINDIR)/$(CONFIG)/cxx_time_test \ $(BINDIR)/$(CONFIG)/end2end_test \ + $(BINDIR)/$(CONFIG)/filter_end2end_test \ $(BINDIR)/$(CONFIG)/generic_end2end_test \ $(BINDIR)/$(CONFIG)/golden_file_test \ $(BINDIR)/$(CONFIG)/grpc_cli \ @@ -1721,6 +1723,8 @@ test_cxx: test_zookeeper buildtests_cxx $(Q) $(BINDIR)/$(CONFIG)/cxx_time_test || ( echo test cxx_time_test failed ; exit 1 ) $(E) "[RUN] Testing end2end_test" $(Q) $(BINDIR)/$(CONFIG)/end2end_test || ( echo test end2end_test failed ; exit 1 ) + $(E) "[RUN] Testing filter_end2end_test" + $(Q) $(BINDIR)/$(CONFIG)/filter_end2end_test || ( echo test filter_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing generic_end2end_test" $(Q) $(BINDIR)/$(CONFIG)/generic_end2end_test || ( echo test generic_end2end_test failed ; exit 1 ) $(E) "[RUN] Testing golden_file_test" @@ -10851,6 +10855,49 @@ endif endif +FILTER_END2END_TEST_SRC = \ + test/cpp/end2end/filter_end2end_test.cc \ + +FILTER_END2END_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(FILTER_END2END_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/filter_end2end_test: openssl_dep_error + +else + + + + +ifeq ($(NO_PROTOBUF),true) + +# You can't build the protoc plugins or protobuf-enabled targets if you don't have protobuf 3.0.0+. + +$(BINDIR)/$(CONFIG)/filter_end2end_test: protobuf_dep_error + +else + +$(BINDIR)/$(CONFIG)/filter_end2end_test: $(PROTOBUF_DEP) $(FILTER_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LDXX) $(LDFLAGS) $(FILTER_END2END_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS_PROTOBUF) $(LDLIBS) $(LDLIBS_SECURE) $(GTEST_LIB) -o $(BINDIR)/$(CONFIG)/filter_end2end_test + +endif + +endif + +$(OBJDIR)/$(CONFIG)/test/cpp/end2end/filter_end2end_test.o: $(LIBDIR)/$(CONFIG)/libgrpc++_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc++.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_filter_end2end_test: $(FILTER_END2END_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(FILTER_END2END_TEST_OBJS:.o=.dep) +endif +endif + + GENERIC_END2END_TEST_SRC = \ test/cpp/end2end/generic_end2end_test.cc \ diff --git a/build.yaml b/build.yaml index b6b1f75853..ad1211f219 100644 --- a/build.yaml +++ b/build.yaml @@ -2611,6 +2611,19 @@ targets: - grpc - gpr_test_util - gpr +- name: filter_end2end_test + gtest: true + build: test + language: c++ + src: + - test/cpp/end2end/filter_end2end_test.cc + deps: + - grpc++_test_util + - grpc_test_util + - grpc++ + - grpc + - gpr_test_util + - gpr - name: generic_end2end_test gtest: true build: test diff --git a/include/grpc++/channel_filter.h b/include/grpc++/channel_filter.h index 8067ca9c60..b37d986a5a 100644 --- a/include/grpc++/channel_filter.h +++ b/include/grpc++/channel_filter.h @@ -36,6 +36,7 @@ #include +#include #include #include "src/core/lib/channel/channel_stack.h" @@ -45,9 +46,9 @@ // An interface to define filters. // // To define a filter, implement a subclass of each of CallData and -// ChannelData. Then register the filter like this: +// ChannelData. Then register the filter using something like this: // RegisterChannelFilter( -// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX); +// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); // namespace grpc { @@ -56,12 +57,8 @@ namespace grpc { // Note: Must be copyable. class CallData { public: - // Do not override the destructor. Instead, put clean-up code in the - // Destroy() method. virtual ~CallData() {} - virtual void Destroy() {} - virtual void StartTransportStreamOp( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op); @@ -80,12 +77,8 @@ class CallData { // Note: Must be copyable. class ChannelData { public: - // Do not override the destructor. Instead, put clean-up code in the - // Destroy() method. virtual ~ChannelData() {} - virtual void Destroy() {} - virtual void StartTransportOp( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op); @@ -99,43 +92,40 @@ namespace internal { // Defines static members for passing to C core. template class ChannelFilter { + public: static const size_t call_data_size = sizeof(CallDataType); static void InitCallElement( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { - CallDataType* call_data = elem->call_data; - *call_data = CallDataType(); + // Construct the object in the already-allocated memory. + new (elem->call_data) CallDataType(); } static void DestroyCallElement( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_stats *stats, void *and_free_memory) { - CallDataType* call_data = elem->call_data; - // Can't destroy the object here, since it's not allocated by - // itself; instead, it's part of a larger allocation managed by - // C-core. So instead, we call the Destroy() method. - call_data->Destroy(); + reinterpret_cast(elem->call_data)->~CallDataType(); } static void StartTransportStreamOp( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { - CallDataType* call_data = elem->call_data; - call_data->StartTransportStreamOp(exec_ctx, op); + CallDataType* call_data = (CallDataType*)elem->call_data; + call_data->StartTransportStreamOp(exec_ctx, elem, op); } static void SetPollsetOrPollsetSet( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_polling_entity *pollent) { - CallDataType* call_data = elem->call_data; - call_data->SetPollsetOrPollsetSet(exec_ctx, pollent); + CallDataType* call_data = (CallDataType*)elem->call_data; + call_data->SetPollsetOrPollsetSet(exec_ctx, elem, pollent); } static char* GetPeer( grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - CallDataType* call_data = elem->call_data; - return call_data->GetPeer(exec_ctx); + CallDataType* call_data = (CallDataType*)elem->call_data; + return call_data->GetPeer(exec_ctx, elem); } static const size_t channel_data_size = sizeof(ChannelDataType); @@ -143,44 +133,45 @@ class ChannelFilter { static void InitChannelElement( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { - ChannelDataType* channel_data = elem->channel_data; - *channel_data = ChannelDataType(); + // Construct the object in the already-allocated memory. + new (elem->channel_data) ChannelDataType(); } static void DestroyChannelElement( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { - ChannelDataType* channel_data = elem->channel_data; - // Can't destroy the object here, since it's not allocated by - // itself; instead, it's part of a larger allocation managed by - // C-core. So instead, we call the Destroy() method. - channel_data->Destroy(); + reinterpret_cast(elem->channel_data)->~ChannelDataType(); } static void StartTransportOp( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { - ChannelDataType* channel_data = elem->channel_data; - channel_data->StartTransportOp(exec_ctx, op); + ChannelDataType* channel_data = (ChannelDataType*)elem->channel_data; + channel_data->StartTransportOp(exec_ctx, elem, op); } }; struct FilterRecord { grpc_channel_stack_type stack_type; int priority; + std::function include_filter; grpc_channel_filter filter; }; extern std::vector* channel_filters; void ChannelFilterPluginInit(); -void ChannelFilterPluginShutdown() {} +void ChannelFilterPluginShutdown(); } // namespace internal // Registers a new filter. // Must be called by only one thread at a time. +// The include_filter argument specifies a function that will be called +// to determine at run-time whether or not to add the filter. If the +// value is nullptr, the filter will be added unconditionally. template -void RegisterChannelFilter(const char* name, - grpc_channel_stack_type stack_type, int priority) { +void RegisterChannelFilter( + const char* name, grpc_channel_stack_type stack_type, int priority, + std::function include_filter) { // If we haven't been called before, initialize channel_filters and // call grpc_register_plugin(). if (internal::channel_filters == nullptr) { @@ -191,8 +182,8 @@ void RegisterChannelFilter(const char* name, // Add an entry to channel_filters. The filter will be added when the // C-core initialization code calls ChannelFilterPluginInit(). typedef internal::ChannelFilter FilterType; - internal::channel_filters->emplace_back({ - stack_type, priority, { + internal::FilterRecord filter_record = { + stack_type, priority, include_filter, { FilterType::StartTransportStreamOp, FilterType::StartTransportOp, FilterType::call_data_size, @@ -203,7 +194,8 @@ void RegisterChannelFilter(const char* name, FilterType::InitChannelElement, FilterType::DestroyChannelElement, FilterType::GetPeer, - name}}); + name}}; + internal::channel_filters->push_back(filter_record); } } // namespace grpc diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 41dd4a0d8a..535acde393 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -51,6 +51,10 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/transport/transport.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct grpc_channel_element grpc_channel_element; typedef struct grpc_call_element grpc_call_element; @@ -278,4 +282,8 @@ extern int grpc_trace_channel; #define GRPC_CALL_LOG_OP(sev, elem, op) \ if (grpc_trace_channel) grpc_call_log_op(sev, elem, op) +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_H */ diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index 0e6bfd9aa6..4a00f7bfdb 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -39,6 +39,10 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_stack.h" +#ifdef __cplusplus +extern "C" { +#endif + /// grpc_channel_stack_builder offers a programmatic interface to selected /// and order channel filters typedef struct grpc_channel_stack_builder grpc_channel_stack_builder; @@ -158,4 +162,8 @@ void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder *builder); extern int grpc_trace_channel_stack_builder; +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H */ diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h index 3a18a61ddb..b53f2aefb9 100644 --- a/src/core/lib/surface/channel_init.h +++ b/src/core/lib/surface/channel_init.h @@ -40,6 +40,10 @@ #define GRPC_CHANNEL_INIT_BUILTIN_PRIORITY 10000 +#ifdef __cplusplus +extern "C" { +#endif + /// This module provides a way for plugins (and the grpc core library itself) /// to register mutators for channel stacks. /// It also provides a universal entry path to run those mutators to build @@ -84,4 +88,8 @@ bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, grpc_channel_stack_type type); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_SURFACE_CHANNEL_INIT_H */ diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index 9a80195e86..b5e5e08976 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -78,9 +78,16 @@ std::vector* channel_filters = nullptr; namespace { -bool AppendFilter(grpc_channel_stack_builder* builder, void* arg) { - return grpc_channel_stack_builder_append_filter( - builder, (const grpc_channel_filter *)arg, nullptr, nullptr); +bool MaybeAddFilter(grpc_channel_stack_builder* builder, void* arg) { + const FilterRecord& filter = *(FilterRecord*)arg; + if (filter.include_filter != nullptr) { + const grpc_channel_args *args = + grpc_channel_stack_builder_get_channel_arguments(builder); + if (!filter.include_filter(args)) + return true; + } + return grpc_channel_stack_builder_prepend_filter( + builder, &filter.filter, nullptr, nullptr); } } // namespace @@ -89,10 +96,12 @@ void ChannelFilterPluginInit() { for (size_t i = 0; i < channel_filters->size(); ++i) { FilterRecord& filter = (*channel_filters)[i]; grpc_channel_init_register_stage(filter.stack_type, filter.priority, - AppendFilter, (void*)&filter.filter); + MaybeAddFilter, (void*)&filter); } } +void ChannelFilterPluginShutdown() {} + } // namespace internal } // namespace grpc diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc new file mode 100644 index 0000000000..79a5287fbc --- /dev/null +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -0,0 +1,327 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/util/byte_buffer_proto_helper.h" + +using grpc::testing::EchoRequest; +using grpc::testing::EchoResponse; +using std::chrono::system_clock; + +namespace grpc { +namespace testing { +namespace { + +void* tag(int i) { return (void*)(intptr_t)i; } + +void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { + bool ok; + void* got_tag; + EXPECT_TRUE(cq->Next(&got_tag, &ok)); + EXPECT_EQ(expect_ok, ok); + EXPECT_EQ(tag(i), got_tag); +} + +namespace { + +int global_num_calls = 0; +mutex global_mu; + +void IncrementCounter() { + unique_lock lock(global_mu); + ++global_num_calls; +} + +void ResetCounter() { + unique_lock lock(global_mu); + global_num_calls = 0; +} + +int GetCounterValue() { + unique_lock lock(global_mu); + return global_num_calls; +} + +} // namespace + +class CallDataImpl : public CallData { + public: + CallDataImpl() {} + virtual ~CallDataImpl() {} + + void StartTransportStreamOp( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op *op) { + gpr_log(GPR_ERROR, "INCREMENTING"); + IncrementCounter(); + grpc_call_next_op(exec_ctx, elem, op); + } +}; + +class ChannelDataImpl : public ChannelData { + public: + ChannelDataImpl() {} + virtual ~ChannelDataImpl() {} +}; + +class FilterEnd2endTest : public ::testing::Test { + protected: + FilterEnd2endTest() : server_host_("localhost") {} + + void SetUp() GRPC_OVERRIDE { + RegisterChannelFilter( + "test-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); + int port = grpc_pick_unused_port_or_die(); + server_address_ << server_host_ << ":" << port; + // Setup server + ServerBuilder builder; + builder.AddListeningPort(server_address_.str(), + InsecureServerCredentials()); + builder.RegisterAsyncGenericService(&generic_service_); + // Include a second call to RegisterAsyncGenericService to make sure that + // we get an error in the log, since it is not allowed to have 2 async + // generic services + builder.RegisterAsyncGenericService(&generic_service_); + srv_cq_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); + } + + void TearDown() GRPC_OVERRIDE { + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cli_cq_.Shutdown(); + srv_cq_->Shutdown(); + while (cli_cq_.Next(&ignored_tag, &ignored_ok)) + ; + while (srv_cq_->Next(&ignored_tag, &ignored_ok)) + ; + } + + void ResetStub() { + std::shared_ptr channel = + CreateChannel(server_address_.str(), InsecureChannelCredentials()); + generic_stub_.reset(new GenericStub(channel)); + ResetCounter(); + } + + void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } + void client_ok(int i) { verify_ok(&cli_cq_, i, true); } + void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); } + void client_fail(int i) { verify_ok(&cli_cq_, i, false); } + + void SendRpc(int num_rpcs) { + const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo"); + for (int i = 0; i < num_rpcs; i++) { + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + GenericServerContext srv_ctx; + GenericServerAsyncReaderWriter stream(&srv_ctx); + + // The string needs to be long enough to test heap-based slice. + send_request.set_message("Hello world. Hello world. Hello world."); + std::unique_ptr call = + generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1)); + client_ok(1); + std::unique_ptr send_buffer = + SerializeToByteBuffer(&send_request); + call->Write(*send_buffer, tag(2)); + // Send ByteBuffer can be destroyed after calling Write. + send_buffer.reset(); + client_ok(2); + call->WritesDone(tag(3)); + client_ok(3); + + generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(), + srv_cq_.get(), tag(4)); + + verify_ok(srv_cq_.get(), 4, true); + EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); + EXPECT_EQ(kMethodName, srv_ctx.method()); + ByteBuffer recv_buffer; + stream.Read(&recv_buffer, tag(5)); + server_ok(5); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request)); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + send_buffer = SerializeToByteBuffer(&send_response); + stream.Write(*send_buffer, tag(6)); + send_buffer.reset(); + server_ok(6); + + stream.Finish(Status::OK, tag(7)); + server_ok(7); + + recv_buffer.Clear(); + call->Read(&recv_buffer, tag(8)); + client_ok(8); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response)); + + call->Finish(&recv_status, tag(9)); + client_ok(9); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + } + } + + CompletionQueue cli_cq_; + std::unique_ptr srv_cq_; + std::unique_ptr stub_; + std::unique_ptr generic_stub_; + std::unique_ptr server_; + AsyncGenericService generic_service_; + const grpc::string server_host_; + std::ostringstream server_address_; +}; + +TEST_F(FilterEnd2endTest, SimpleRpc) { + ResetStub(); + EXPECT_EQ(0, GetCounterValue()); + SendRpc(1); + EXPECT_EQ(1, GetCounterValue()); +} + +TEST_F(FilterEnd2endTest, SequentialRpcs) { + ResetStub(); + EXPECT_EQ(0, GetCounterValue()); + SendRpc(10); + EXPECT_EQ(10, GetCounterValue()); +} + +// One ping, one pong. +TEST_F(FilterEnd2endTest, SimpleBidiStreaming) { + ResetStub(); + EXPECT_EQ(0, GetCounterValue()); + + const grpc::string kMethodName( + "/grpc.cpp.test.util.EchoTestService/BidiStream"); + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + ClientContext cli_ctx; + GenericServerContext srv_ctx; + GenericServerAsyncReaderWriter srv_stream(&srv_ctx); + + cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP); + send_request.set_message("Hello"); + std::unique_ptr cli_stream = + generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1)); + client_ok(1); + + generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(), + srv_cq_.get(), tag(2)); + + verify_ok(srv_cq_.get(), 2, true); + EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length())); + EXPECT_EQ(kMethodName, srv_ctx.method()); + + std::unique_ptr send_buffer = + SerializeToByteBuffer(&send_request); + cli_stream->Write(*send_buffer, tag(3)); + send_buffer.reset(); + client_ok(3); + + ByteBuffer recv_buffer; + srv_stream.Read(&recv_buffer, tag(4)); + server_ok(4); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request)); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + send_buffer = SerializeToByteBuffer(&send_response); + srv_stream.Write(*send_buffer, tag(5)); + send_buffer.reset(); + server_ok(5); + + cli_stream->Read(&recv_buffer, tag(6)); + client_ok(6); + EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response)); + EXPECT_EQ(send_response.message(), recv_response.message()); + + cli_stream->WritesDone(tag(7)); + client_ok(7); + + srv_stream.Read(&recv_buffer, tag(8)); + server_fail(8); + + srv_stream.Finish(Status::OK, tag(9)); + server_ok(9); + + cli_stream->Finish(&recv_status, tag(10)); + client_ok(10); + + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + + EXPECT_EQ(1, GetCounterValue()); +} + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 00ea2fdc05..ee6cda804a 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -2062,6 +2062,24 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc++", + "grpc++_test_util", + "grpc_test_util" + ], + "headers": [], + "language": "c++", + "name": "filter_end2end_test", + "src": [ + "test/cpp/end2end/filter_end2end_test.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 5a84a41b63..5744261c99 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -2210,6 +2210,27 @@ "windows" ] }, + { + "args": [], + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "filter_end2end_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ] + }, { "args": [], "ci_platforms": [ diff --git a/vsprojects/vcxproj/test/filter_end2end_test/filter_end2end_test.vcxproj b/vsprojects/vcxproj/test/filter_end2end_test/filter_end2end_test.vcxproj new file mode 100644 index 0000000000..99c000e4fa --- /dev/null +++ b/vsprojects/vcxproj/test/filter_end2end_test/filter_end2end_test.vcxproj @@ -0,0 +1,207 @@ + + + + + + Debug + Win32 + + + Debug + x64 + + + Release + Win32 + + + Release + x64 + + + + {1D42975A-18A5-09D0-30B1-2AE6A97FB9DE} + true + $(SolutionDir)IntDir\$(MSBuildProjectName)\ + + + + v100 + + + v110 + + + v120 + + + v140 + + + Application + true + Unicode + + + Application + false + true + Unicode + + + + + + + + + + + + + + + + filter_end2end_test + static + Debug + static + Debug + + + filter_end2end_test + static + Release + static + Release + + + + NotUsing + Level3 + Disabled + WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) + true + MultiThreadedDebug + true + None + false + + + Console + true + false + + + + + + NotUsing + Level3 + Disabled + WIN32;_DEBUG;_LIB;%(PreprocessorDefinitions) + true + MultiThreadedDebug + true + None + false + + + Console + true + false + + + + + + NotUsing + Level3 + MaxSpeed + WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) + true + true + true + MultiThreaded + true + None + false + + + Console + true + false + true + true + + + + + + NotUsing + Level3 + MaxSpeed + WIN32;NDEBUG;_LIB;%(PreprocessorDefinitions) + true + true + true + MultiThreaded + true + None + false + + + Console + true + false + true + true + + + + + + + + + + {0BE77741-552A-929B-A497-4EF7ECE17A64} + + + {17BCAFC0-5FDC-4C94-AEB9-95F3E220614B} + + + {C187A093-A0FE-489D-A40A-6E33DE0F9FEB} + + + {29D16885-7228-4C31-81ED-5F9187C7F2A9} + + + {EAB0A629-17A9-44DB-B5FF-E91A721FE037} + + + {B23D3D1A-9438-4EDA-BEB6-9A0A03D17792} + + + + + + + + + + + + + + + This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + + + + + diff --git a/vsprojects/vcxproj/test/filter_end2end_test/filter_end2end_test.vcxproj.filters b/vsprojects/vcxproj/test/filter_end2end_test/filter_end2end_test.vcxproj.filters new file mode 100644 index 0000000000..c68442365c --- /dev/null +++ b/vsprojects/vcxproj/test/filter_end2end_test/filter_end2end_test.vcxproj.filters @@ -0,0 +1,21 @@ + + + + + test\cpp\end2end + + + + + + {f7581160-220d-1118-f29e-6b37e45671c9} + + + {63784d35-03cc-1a7e-0c95-a9451bc1daf4} + + + {d2a01682-970e-d23d-1eb8-ef020e3a1ca3} + + + + -- cgit v1.2.3 From c008b33c18f1d2c49ee1b6683c2d13a85e2c2432 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 16 Jun 2016 10:39:39 -0700 Subject: Pass channel args to ChannelData ctor and ChannelData to CallData ctor. --- include/grpc++/channel_filter.h | 79 +++++++++++++++++---------------- src/cpp/common/channel_filter.cc | 2 +- test/cpp/end2end/filter_end2end_test.cc | 15 ++++--- 3 files changed, 49 insertions(+), 47 deletions(-) (limited to 'src') diff --git a/include/grpc++/channel_filter.h b/include/grpc++/channel_filter.h index b37d986a5a..8731a5e965 100644 --- a/include/grpc++/channel_filter.h +++ b/include/grpc++/channel_filter.h @@ -53,6 +53,20 @@ namespace grpc { +// Represents channel data. +// Note: Must be copyable. +class ChannelData { + public: + virtual ~ChannelData() {} + + virtual void StartTransportOp( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_transport_op *op); + + protected: + explicit ChannelData(const grpc_channel_args&) {} +}; + // Represents call data. // Note: Must be copyable. class CallData { @@ -70,21 +84,7 @@ class CallData { virtual char* GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); protected: - CallData() {} -}; - -// Represents channel data. -// Note: Must be copyable. -class ChannelData { - public: - virtual ~ChannelData() {} - - virtual void StartTransportOp( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_transport_op *op); - - protected: - ChannelData() {} + explicit CallData(const ChannelData&) {} }; namespace internal { @@ -93,13 +93,35 @@ namespace internal { template class ChannelFilter { public: + static const size_t channel_data_size = sizeof(ChannelDataType); + + static void InitChannelElement( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_channel_element_args *args) { + // Construct the object in the already-allocated memory. + new (elem->channel_data) ChannelDataType(*args->channel_args); + } + + static void DestroyChannelElement( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { + reinterpret_cast(elem->channel_data)->~ChannelDataType(); + } + + static void StartTransportOp( + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_transport_op *op) { + ChannelDataType* channel_data = (ChannelDataType*)elem->channel_data; + channel_data->StartTransportOp(exec_ctx, elem, op); + } + static const size_t call_data_size = sizeof(CallDataType); static void InitCallElement( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { + const ChannelDataType& channel_data = *(ChannelDataType*)elem->channel_data; // Construct the object in the already-allocated memory. - new (elem->call_data) CallDataType(); + new (elem->call_data) CallDataType(channel_data); } static void DestroyCallElement( @@ -127,33 +149,12 @@ class ChannelFilter { CallDataType* call_data = (CallDataType*)elem->call_data; return call_data->GetPeer(exec_ctx, elem); } - - static const size_t channel_data_size = sizeof(ChannelDataType); - - static void InitChannelElement( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_channel_element_args *args) { - // Construct the object in the already-allocated memory. - new (elem->channel_data) ChannelDataType(); - } - - static void DestroyChannelElement( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { - reinterpret_cast(elem->channel_data)->~ChannelDataType(); - } - - static void StartTransportOp( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_transport_op *op) { - ChannelDataType* channel_data = (ChannelDataType*)elem->channel_data; - channel_data->StartTransportOp(exec_ctx, elem, op); - } }; struct FilterRecord { grpc_channel_stack_type stack_type; int priority; - std::function include_filter; + std::function include_filter; grpc_channel_filter filter; }; extern std::vector* channel_filters; @@ -171,7 +172,7 @@ void ChannelFilterPluginShutdown(); template void RegisterChannelFilter( const char* name, grpc_channel_stack_type stack_type, int priority, - std::function include_filter) { + std::function include_filter) { // If we haven't been called before, initialize channel_filters and // call grpc_register_plugin(). if (internal::channel_filters == nullptr) { diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index b5e5e08976..77b2a26e8c 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -83,7 +83,7 @@ bool MaybeAddFilter(grpc_channel_stack_builder* builder, void* arg) { if (filter.include_filter != nullptr) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); - if (!filter.include_filter(args)) + if (!filter.include_filter(*args)) return true; } return grpc_channel_stack_builder_prepend_filter( diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index be6988c6ba..16151c21b8 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -95,9 +95,16 @@ int GetCounterValue() { } // namespace +class ChannelDataImpl : public ChannelData { + public: + explicit ChannelDataImpl(const grpc_channel_args& args) : ChannelData(args) {} + virtual ~ChannelDataImpl() {} +}; + class CallDataImpl : public CallData { public: - CallDataImpl() {} + explicit CallDataImpl(const ChannelDataImpl& channel_data) + : CallData(channel_data) {} virtual ~CallDataImpl() {} void StartTransportStreamOp( @@ -109,12 +116,6 @@ class CallDataImpl : public CallData { } }; -class ChannelDataImpl : public ChannelData { - public: - ChannelDataImpl() {} - virtual ~ChannelDataImpl() {} -}; - class FilterEnd2endTest : public ::testing::Test { protected: FilterEnd2endTest() : server_host_("localhost") {} -- cgit v1.2.3 From f9c1f7a412619e78a93a116068fe0fe373e172e1 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 16 Jun 2016 10:57:28 -0700 Subject: clang-format --- include/grpc++/channel_filter.h | 114 +++++++++++++++----------------- src/cpp/common/channel_filter.cc | 38 +++++------ test/cpp/end2end/filter_end2end_test.cc | 8 +-- 3 files changed, 76 insertions(+), 84 deletions(-) (limited to 'src') diff --git a/include/grpc++/channel_filter.h b/include/grpc++/channel_filter.h index 8731a5e965..f73abe8e6c 100644 --- a/include/grpc++/channel_filter.h +++ b/include/grpc++/channel_filter.h @@ -59,12 +59,12 @@ class ChannelData { public: virtual ~ChannelData() {} - virtual void StartTransportOp( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_transport_op *op); + virtual void StartTransportOp(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op); protected: - explicit ChannelData(const grpc_channel_args&) {} + explicit ChannelData(const grpc_channel_args &) {} }; // Represents call data. @@ -73,80 +73,80 @@ class CallData { public: virtual ~CallData() {} - virtual void StartTransportStreamOp( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op); + virtual void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op); - virtual void SetPollsetOrPollsetSet( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_polling_entity *pollent); + virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_polling_entity *pollent); - virtual char* GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); protected: - explicit CallData(const ChannelData&) {} + explicit CallData(const ChannelData &) {} }; namespace internal { // Defines static members for passing to C core. -template +template class ChannelFilter { public: static const size_t channel_data_size = sizeof(ChannelDataType); - static void InitChannelElement( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_channel_element_args *args) { + static void InitChannelElement(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { // Construct the object in the already-allocated memory. new (elem->channel_data) ChannelDataType(*args->channel_args); } - static void DestroyChannelElement( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { - reinterpret_cast(elem->channel_data)->~ChannelDataType(); + static void DestroyChannelElement(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { + reinterpret_cast(elem->channel_data)->~ChannelDataType(); } - static void StartTransportOp( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_transport_op *op) { - ChannelDataType* channel_data = (ChannelDataType*)elem->channel_data; + static void StartTransportOp(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + ChannelDataType *channel_data = (ChannelDataType *)elem->channel_data; channel_data->StartTransportOp(exec_ctx, elem, op); } static const size_t call_data_size = sizeof(CallDataType); - static void InitCallElement( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { - const ChannelDataType& channel_data = *(ChannelDataType*)elem->channel_data; + static void InitCallElement(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_call_element_args *args) { + const ChannelDataType &channel_data = + *(ChannelDataType *)elem->channel_data; // Construct the object in the already-allocated memory. new (elem->call_data) CallDataType(channel_data); } - static void DestroyCallElement( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const grpc_call_stats *stats, void *and_free_memory) { - reinterpret_cast(elem->call_data)->~CallDataType(); + static void DestroyCallElement(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_stats *stats, + void *and_free_memory) { + reinterpret_cast(elem->call_data)->~CallDataType(); } - static void StartTransportStreamOp( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { - CallDataType* call_data = (CallDataType*)elem->call_data; + static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + CallDataType *call_data = (CallDataType *)elem->call_data; call_data->StartTransportStreamOp(exec_ctx, elem, op); } - static void SetPollsetOrPollsetSet( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_polling_entity *pollent) { - CallDataType* call_data = (CallDataType*)elem->call_data; + static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_polling_entity *pollent) { + CallDataType *call_data = (CallDataType *)elem->call_data; call_data->SetPollsetOrPollsetSet(exec_ctx, elem, pollent); } - static char* GetPeer( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - CallDataType* call_data = (CallDataType*)elem->call_data; + static char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + CallDataType *call_data = (CallDataType *)elem->call_data; return call_data->GetPeer(exec_ctx, elem); } }; @@ -154,10 +154,10 @@ class ChannelFilter { struct FilterRecord { grpc_channel_stack_type stack_type; int priority; - std::function include_filter; + std::function include_filter; grpc_channel_filter filter; }; -extern std::vector* channel_filters; +extern std::vector *channel_filters; void ChannelFilterPluginInit(); void ChannelFilterPluginShutdown(); @@ -169,10 +169,10 @@ void ChannelFilterPluginShutdown(); // The include_filter argument specifies a function that will be called // to determine at run-time whether or not to add the filter. If the // value is nullptr, the filter will be added unconditionally. -template +template void RegisterChannelFilter( - const char* name, grpc_channel_stack_type stack_type, int priority, - std::function include_filter) { + const char *name, grpc_channel_stack_type stack_type, int priority, + std::function include_filter) { // If we haven't been called before, initialize channel_filters and // call grpc_register_plugin(). if (internal::channel_filters == nullptr) { @@ -184,18 +184,14 @@ void RegisterChannelFilter( // C-core initialization code calls ChannelFilterPluginInit(). typedef internal::ChannelFilter FilterType; internal::FilterRecord filter_record = { - stack_type, priority, include_filter, { - FilterType::StartTransportStreamOp, - FilterType::StartTransportOp, - FilterType::call_data_size, - FilterType::InitCallElement, - FilterType::SetPollsetOrPollsetSet, - FilterType::DestroyCallElement, - FilterType::channel_data_size, - FilterType::InitChannelElement, - FilterType::DestroyChannelElement, - FilterType::GetPeer, - name}}; + stack_type, + priority, + include_filter, + {FilterType::StartTransportStreamOp, FilterType::StartTransportOp, + FilterType::call_data_size, FilterType::InitCallElement, + FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement, + FilterType::channel_data_size, FilterType::InitChannelElement, + FilterType::DestroyChannelElement, FilterType::GetPeer, name}}; internal::channel_filters->push_back(filter_record); } diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index 77b2a26e8c..86df47903f 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -41,20 +41,19 @@ namespace grpc { // CallData // -void CallData::StartTransportStreamOp( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { +void CallData::StartTransportStreamOp(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { grpc_call_next_op(exec_ctx, elem, op); } -void CallData::SetPollsetOrPollsetSet( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_polling_entity *pollent) { +void CallData::SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_polling_entity *pollent) { grpc_call_stack_ignore_set_pollset_or_pollset_set(exec_ctx, elem, pollent); } -char* CallData::GetPeer( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { +char *CallData::GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { return grpc_call_next_get_peer(exec_ctx, elem); } @@ -62,9 +61,9 @@ char* CallData::GetPeer( // ChannelData // -void ChannelData::StartTransportOp( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_transport_op *op) { +void ChannelData::StartTransportOp(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { grpc_channel_next_op(exec_ctx, elem, op); } @@ -74,29 +73,28 @@ void ChannelData::StartTransportOp( namespace internal { -std::vector* channel_filters = nullptr; +std::vector *channel_filters = nullptr; namespace { -bool MaybeAddFilter(grpc_channel_stack_builder* builder, void* arg) { - const FilterRecord& filter = *(FilterRecord*)arg; +bool MaybeAddFilter(grpc_channel_stack_builder *builder, void *arg) { + const FilterRecord &filter = *(FilterRecord *)arg; if (filter.include_filter != nullptr) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); - if (!filter.include_filter(*args)) - return true; + if (!filter.include_filter(*args)) return true; } - return grpc_channel_stack_builder_prepend_filter( - builder, &filter.filter, nullptr, nullptr); + return grpc_channel_stack_builder_prepend_filter(builder, &filter.filter, + nullptr, nullptr); } } // namespace void ChannelFilterPluginInit() { for (size_t i = 0; i < channel_filters->size(); ++i) { - FilterRecord& filter = (*channel_filters)[i]; + FilterRecord &filter = (*channel_filters)[i]; grpc_channel_init_register_stage(filter.stack_type, filter.priority, - MaybeAddFilter, (void*)&filter); + MaybeAddFilter, (void *)&filter); } } diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index 16151c21b8..d72e8100d7 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -107,11 +107,9 @@ class CallDataImpl : public CallData { : CallData(channel_data) {} virtual ~CallDataImpl() {} - void StartTransportStreamOp( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { - if (op->recv_initial_metadata != nullptr) - IncrementCounter(); + void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_transport_stream_op* op) { + if (op->recv_initial_metadata != nullptr) IncrementCounter(); grpc_call_next_op(exec_ctx, elem, op); } }; -- cgit v1.2.3 From f5bbff9c8e314e29e12e9891c75908f60d941a5f Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 16 Jun 2016 12:56:07 -0700 Subject: Fix portability issues. --- src/cpp/common/channel_filter.cc | 5 +++-- test/cpp/end2end/filter_end2end_test.cc | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index 86df47903f..f81a01c62d 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -73,13 +73,14 @@ void ChannelData::StartTransportOp(grpc_exec_ctx *exec_ctx, namespace internal { -std::vector *channel_filters = nullptr; +// Note: Implicitly initialized to nullptr due to static lifetime. +std::vector *channel_filters; namespace { bool MaybeAddFilter(grpc_channel_stack_builder *builder, void *arg) { const FilterRecord &filter = *(FilterRecord *)arg; - if (filter.include_filter != nullptr) { + if (filter.include_filter) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); if (!filter.include_filter(*args)) return true; diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index 7cb77be42d..4ba3b9e0d1 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -108,7 +108,7 @@ class CallDataImpl : public CallData { virtual ~CallDataImpl() {} void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op* op) { + grpc_transport_stream_op* op) GRPC_OVERRIDE { if (op->recv_initial_metadata != nullptr) IncrementCounter(); grpc_call_next_op(exec_ctx, elem, op); } -- cgit v1.2.3 From 9fe5ef5f9d17289f57ee1d5436314c3153b80975 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 16 Jun 2016 14:26:04 -0700 Subject: Removed unnecessary comments. Added connection counter to test. --- include/grpc++/channel_filter.h | 2 -- src/cpp/common/channel_filter.cc | 20 ++++++------ test/cpp/end2end/filter_end2end_test.cc | 54 ++++++++++++++++++++++++--------- 3 files changed, 50 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/include/grpc++/channel_filter.h b/include/grpc++/channel_filter.h index f73abe8e6c..542e6a0f35 100644 --- a/include/grpc++/channel_filter.h +++ b/include/grpc++/channel_filter.h @@ -54,7 +54,6 @@ namespace grpc { // Represents channel data. -// Note: Must be copyable. class ChannelData { public: virtual ~ChannelData() {} @@ -68,7 +67,6 @@ class ChannelData { }; // Represents call data. -// Note: Must be copyable. class CallData { public: virtual ~CallData() {} diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index f81a01c62d..f473d63a25 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -37,6 +37,16 @@ namespace grpc { +// +// ChannelData +// + +void ChannelData::StartTransportOp(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + grpc_channel_next_op(exec_ctx, elem, op); +} + // // CallData // @@ -57,16 +67,6 @@ char *CallData::GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { return grpc_call_next_get_peer(exec_ctx, elem); } -// -// ChannelData -// - -void ChannelData::StartTransportOp(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_transport_op *op) { - grpc_channel_next_op(exec_ctx, elem, op); -} - // // RegisterChannelFilter() // diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index 4ba3b9e0d1..b21d377d5d 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -75,20 +75,36 @@ void verify_ok(CompletionQueue* cq, int i, bool expect_ok) { namespace { +int global_num_connections = 0; int global_num_calls = 0; mutex global_mu; -void IncrementCounter() { +void IncrementConnectionCounter() { + unique_lock lock(global_mu); + ++global_num_connections; +} + +void ResetConnectionCounter() { + unique_lock lock(global_mu); + global_num_connections = 0; +} + +int GetConnectionCounterValue() { + unique_lock lock(global_mu); + return global_num_connections; +} + +void IncrementCallCounter() { unique_lock lock(global_mu); ++global_num_calls; } -void ResetCounter() { +void ResetCallCounter() { unique_lock lock(global_mu); global_num_calls = 0; } -int GetCounterValue() { +int GetCallCounterValue() { unique_lock lock(global_mu); return global_num_calls; } @@ -97,19 +113,22 @@ int GetCounterValue() { class ChannelDataImpl : public ChannelData { public: - explicit ChannelDataImpl(const grpc_channel_args& args) : ChannelData(args) {} - virtual ~ChannelDataImpl() {} + ChannelDataImpl(const grpc_channel_args& args, const char* peer) + : ChannelData(args, peer) { + IncrementConnectionCounter(); + } }; class CallDataImpl : public CallData { public: explicit CallDataImpl(const ChannelDataImpl& channel_data) : CallData(channel_data) {} - virtual ~CallDataImpl() {} void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) GRPC_OVERRIDE { - if (op->recv_initial_metadata != nullptr) IncrementCounter(); + // Incrementing the counter could be done from the ctor, but we want + // to test that the individual methods are actually called correctly. + if (op->recv_initial_metadata != nullptr) IncrementCallCounter(); grpc_call_next_op(exec_ctx, elem, op); } }; @@ -146,7 +165,8 @@ class FilterEnd2endTest : public ::testing::Test { std::shared_ptr channel = CreateChannel(server_address_.str(), InsecureChannelCredentials()); generic_stub_.reset(new GenericStub(channel)); - ResetCounter(); + ResetConnectionCounter(); + ResetCallCounter(); } void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); } @@ -227,22 +247,27 @@ class FilterEnd2endTest : public ::testing::Test { TEST_F(FilterEnd2endTest, SimpleRpc) { ResetStub(); - EXPECT_EQ(0, GetCounterValue()); + EXPECT_EQ(0, GetConnectionCounterValue()); + EXPECT_EQ(0, GetCallCounterValue()); SendRpc(1); - EXPECT_EQ(1, GetCounterValue()); + EXPECT_EQ(1, GetConnectionCounterValue()); + EXPECT_EQ(1, GetCallCounterValue()); } TEST_F(FilterEnd2endTest, SequentialRpcs) { ResetStub(); - EXPECT_EQ(0, GetCounterValue()); + EXPECT_EQ(0, GetConnectionCounterValue()); + EXPECT_EQ(0, GetCallCounterValue()); SendRpc(10); - EXPECT_EQ(10, GetCounterValue()); + EXPECT_EQ(1, GetConnectionCounterValue()); + EXPECT_EQ(10, GetCallCounterValue()); } // One ping, one pong. TEST_F(FilterEnd2endTest, SimpleBidiStreaming) { ResetStub(); - EXPECT_EQ(0, GetCounterValue()); + EXPECT_EQ(0, GetConnectionCounterValue()); + EXPECT_EQ(0, GetCallCounterValue()); const grpc::string kMethodName( "/grpc.cpp.test.util.EchoTestService/BidiStream"); @@ -306,7 +331,8 @@ TEST_F(FilterEnd2endTest, SimpleBidiStreaming) { EXPECT_EQ(send_response.message(), recv_response.message()); EXPECT_TRUE(recv_status.ok()); - EXPECT_EQ(1, GetCounterValue()); + EXPECT_EQ(1, GetCallCounterValue()); + EXPECT_EQ(1, GetConnectionCounterValue()); } } // namespace -- cgit v1.2.3 From 035cb3a3d4fe9634834b5814827e49adc9cf6b69 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 16 Jun 2016 14:52:41 -0700 Subject: Pass peer string to ChannelData ctor when available. --- include/grpc++/channel_filter.h | 13 +++++++++++-- src/core/lib/transport/transport.h | 8 ++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/include/grpc++/channel_filter.h b/include/grpc++/channel_filter.h index 542e6a0f35..53e42a7cb8 100644 --- a/include/grpc++/channel_filter.h +++ b/include/grpc++/channel_filter.h @@ -62,8 +62,13 @@ class ChannelData { grpc_channel_element *elem, grpc_transport_op *op); + const char* peer() const { return peer_; } + protected: - explicit ChannelData(const grpc_channel_args &) {} + ChannelData(const grpc_channel_args &args, const char *peer) : peer_(peer) {} + + private: + const char *peer_; // Do not own. }; // Represents call data. @@ -96,8 +101,12 @@ class ChannelFilter { static void InitChannelElement(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { + const char* peer = args->optional_transport + ? grpc_transport_get_peer(exec_ctx, + args->optional_transport) + : nullptr; // Construct the object in the already-allocated memory. - new (elem->channel_data) ChannelDataType(*args->channel_args); + new (elem->channel_data) ChannelDataType(*args->channel_args, peer); } static void DestroyChannelElement(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index ed06fc3ed2..107bb802f6 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -43,6 +43,10 @@ #include "src/core/lib/transport/byte_stream.h" #include "src/core/lib/transport/metadata_batch.h" +#ifdef __cplusplus +extern "C" { +#endif + /* forward declarations */ typedef struct grpc_transport grpc_transport; @@ -264,4 +268,8 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport); char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *transport); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_TRANSPORT_TRANSPORT_H */ -- cgit v1.2.3 From 4fd2134dafc61af764b18dacee0acd8ef307dcc4 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 20 Jun 2016 09:23:55 -0700 Subject: Make another .h file accessible from C++. --- src/core/lib/security/context/security_context.h | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'src') diff --git a/src/core/lib/security/context/security_context.h b/src/core/lib/security/context/security_context.h index ef0c06b1fb..4e7666dfe3 100644 --- a/src/core/lib/security/context/security_context.h +++ b/src/core/lib/security/context/security_context.h @@ -37,6 +37,10 @@ #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/security/credentials/credentials.h" +#ifdef __cplusplus +extern "C" { +#endif + /* --- grpc_auth_context --- High level authentication context object. Can optionally be chained. */ @@ -111,4 +115,8 @@ grpc_auth_context *grpc_auth_context_from_arg(const grpc_arg *arg); grpc_auth_context *grpc_find_auth_context_in_args( const grpc_channel_args *args); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_SECURITY_CONTEXT_SECURITY_CONTEXT_H */ -- cgit v1.2.3 From 083afc225d298db97b37c28034dad1d024bfb0f5 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 20 Jun 2016 13:33:11 -0700 Subject: Add comments indicating what the grpc_call_context_element values are for each array index. --- src/core/lib/channel/context.h | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index c50e84279d..22f4cb62f3 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -34,10 +34,19 @@ #ifndef GRPC_CORE_LIB_CHANNEL_CONTEXT_H #define GRPC_CORE_LIB_CHANNEL_CONTEXT_H -/* Call object context pointers */ +// Call object context pointers. + +// Call context is represented as an array of grpc_call_context_elements. +// This enum represents the indexes into the array, where each index +// contains a different type of value. typedef enum { + // Value is either a grpc_client_security_context or a + // grpc_server_security_context. GRPC_CONTEXT_SECURITY = 0, + + // Value is a census_context. GRPC_CONTEXT_TRACING, + GRPC_CONTEXT_COUNT } grpc_context_index; -- cgit v1.2.3 From 7f8db257974a81e16d6865e8bf030a0bd69c10d3 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 24 Jun 2016 08:24:15 -0700 Subject: Bool-ify a couple of fields in grpc_transport_op. --- src/core/lib/surface/channel.c | 2 +- src/core/lib/surface/server.c | 12 +++++++----- src/core/lib/transport/transport.h | 4 ++-- 3 files changed, 10 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index f0b3c2e15d..905b4a97dd 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -336,7 +336,7 @@ void grpc_channel_destroy(grpc_channel *channel) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE("grpc_channel_destroy(channel=%p)", 1, (channel)); memset(&op, 0, sizeof(op)); - op.disconnect = 1; + op.disconnect = true; elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); elem->filter->start_transport_op(&exec_ctx, elem, &op); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 88d898bc1c..6677e65c4c 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -270,7 +270,7 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, } static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, - int send_goaway, int send_disconnect) { + bool send_goaway, bool send_disconnect) { grpc_transport_op op; struct shutdown_cleanup_args *sc; grpc_channel_element *elem; @@ -291,8 +291,8 @@ static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, channel_broadcaster *cb, - int send_goaway, - int force_disconnect) { + bool send_goaway, + bool force_disconnect) { size_t i; for (i = 0; i < cb->num_channels; i++) { @@ -1217,7 +1217,8 @@ void grpc_server_shutdown_and_notify(grpc_server *server, l->destroy(&exec_ctx, server, l->arg, &l->destroy_done); } - channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 1, 0); + channel_broadcaster_shutdown(&exec_ctx, &broadcaster, true /* send_goaway */, + false /* force_disconnect */); done: grpc_exec_ctx_finish(&exec_ctx); @@ -1233,7 +1234,8 @@ void grpc_server_cancel_all_calls(grpc_server *server) { channel_broadcaster_init(server, &broadcaster); gpr_mu_unlock(&server->mu_global); - channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, 1); + channel_broadcaster_shutdown(&exec_ctx, &broadcaster, false /* send_goaway */, + true /* force_disconnect */); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 107bb802f6..b170014dc5 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -159,11 +159,11 @@ typedef struct grpc_transport_op { grpc_closure *on_connectivity_state_change; grpc_connectivity_state *connectivity_state; /** should the transport be disconnected */ - int disconnect; + bool disconnect; /** should we send a goaway? after a goaway is sent, once there are no more active calls on the transport, the transport should disconnect */ - int send_goaway; + bool send_goaway; /** what should the goaway contain? */ grpc_status_code goaway_status; gpr_slice *goaway_message; -- cgit v1.2.3 From 13e343464bd8abd01cec1df2b43f624906552055 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 24 Jun 2016 10:24:43 -0700 Subject: Another boolification. --- src/core/lib/transport/metadata_batch.c | 3 ++- src/core/lib/transport/metadata_batch.h | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c index e4398abeb7..84b5a74d51 100644 --- a/src/core/lib/transport/metadata_batch.c +++ b/src/core/lib/transport/metadata_batch.c @@ -33,6 +33,7 @@ #include "src/core/lib/transport/metadata_batch.h" +#include #include #include @@ -187,7 +188,7 @@ void grpc_metadata_batch_clear(grpc_metadata_batch *batch) { grpc_metadata_batch_filter(batch, no_metadata_for_you, NULL); } -int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) { +bool grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) { return batch->list.head == NULL && gpr_time_cmp(gpr_inf_future(batch->deadline.clock_type), batch->deadline) == 0; diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index 7af823f7ca..dad0cc5510 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -34,6 +34,8 @@ #ifndef GRPC_CORE_LIB_TRANSPORT_METADATA_BATCH_H #define GRPC_CORE_LIB_TRANSPORT_METADATA_BATCH_H +#include + #include #include #include @@ -64,7 +66,7 @@ typedef struct grpc_metadata_batch { void grpc_metadata_batch_init(grpc_metadata_batch *batch); void grpc_metadata_batch_destroy(grpc_metadata_batch *batch); void grpc_metadata_batch_clear(grpc_metadata_batch *batch); -int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch); +bool grpc_metadata_batch_is_empty(grpc_metadata_batch *batch); /* Returns the transport size of the batch. */ size_t grpc_metadata_batch_size(grpc_metadata_batch *batch); -- cgit v1.2.3 From 34d07d6f2a0b7d4c938aaf3502151eebba8b04b5 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 27 Jun 2016 09:53:59 -0700 Subject: Update comments. --- src/core/lib/transport/byte_stream.h | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'src') diff --git a/src/core/lib/transport/byte_stream.h b/src/core/lib/transport/byte_stream.h index 95519a9eaf..e64dce6283 100644 --- a/src/core/lib/transport/byte_stream.h +++ b/src/core/lib/transport/byte_stream.h @@ -59,13 +59,9 @@ struct grpc_byte_stream { * on_complete will not be called), 0 if the bytes will be available * asynchronously. * - * on entry, *remaining can be set as a hint as to the maximum number + * max_size_hint can be set as a hint as to the maximum number * of bytes that would be acceptable to read. * - * fills *buffer, *length, *remaining with the bytes, length of bytes - * and length of data remaining to be read before either returning 1 - * or calling on_complete. - * * once a slice is returned into *slice, it is owned by the caller. */ int grpc_byte_stream_next(grpc_exec_ctx *exec_ctx, -- cgit v1.2.3 From 07cd9c9e064e636c432500724bf5f91ad15d041a Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Mon, 27 Jun 2016 10:14:38 -0700 Subject: Initial attempt at a C++ wrapper for the C grpc_transport_op and grpc_transport_stream_op structs. --- include/grpc++/channel_filter.h | 167 ++++++++++++++++++++++++++++++-- src/core/lib/transport/metadata.h | 8 ++ src/core/lib/transport/metadata_batch.h | 8 ++ src/cpp/common/channel_filter.cc | 23 ++++- test/cpp/end2end/filter_end2end_test.cc | 6 +- third_party/protobuf | 2 +- 6 files changed, 200 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/include/grpc++/channel_filter.h b/include/grpc++/channel_filter.h index 262739929a..2fa4ad31d8 100644 --- a/include/grpc++/channel_filter.h +++ b/include/grpc++/channel_filter.h @@ -35,13 +35,16 @@ #define GRPCXX_CHANNEL_FILTER_H #include +#include #include #include #include #include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/security/context/security_context.h" #include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/transport/metadata_batch.h" // // An interface to define filters. @@ -54,16 +57,164 @@ namespace grpc { +// A C++ wrapper for the grpc_metadata_batch struct. +class MetadataBatch { + public: + explicit MetadataBatch(grpc_metadata_batch* batch) : batch_(batch) {} + + grpc_metadata_batch* batch() const { return batch_; } + + // Adds metadata and returns the newly allocated storage. + // The caller takes ownership of the result, which must exist for the + // lifetime of the gRPC call. + grpc_linked_mdelem* AddMetadata(const string& key, const string& value); + + class const_iterator : public std::iterator { + public: + const grpc_mdelem& operator*() const { return *elem_->md; } + const grpc_mdelem* operator->() const { return elem_->md; } + + const_iterator& operator++() { + elem_ = elem_->next; + return *this; + } + const_iterator operator++(int) { + const_iterator tmp(*this); + operator++(); + return tmp; + } + const_iterator& operator--() { + elem_ = elem_->prev; + return *this; + } + const_iterator operator--(int) { + const_iterator tmp(*this); + operator--(); + return tmp; + } + + bool operator==(const const_iterator& other) const { + return elem_ == other.elem_; + } + bool operator!=(const const_iterator& other) const { + return elem_ != other.elem_; + } + + private: + friend class MetadataBatch; + explicit const_iterator(grpc_linked_mdelem* elem) : elem_(elem) {} + + grpc_linked_mdelem* elem_; + }; + + const_iterator begin() const { return const_iterator(batch_->list.head); } + const_iterator end() const { return const_iterator(nullptr); } + + private: + grpc_metadata_batch* batch_; +}; + +// A C++ wrapper for the grpc_transport_op struct. +class TransportOp { + public: + explicit TransportOp(grpc_transport_op* op) : op_(op) {} + + grpc_transport_op* op() const { return op_; } + + bool disconnect() const { return op_->disconnect; } + bool send_goaway() const { return op_->send_goaway; } + + // TODO(roth): Add methods for additional fields as needed. + + private: + grpc_transport_op* op_; // Do not own. +}; + +// A C++ wrapper for the grpc_transport_stream_op struct. +class TransportStreamOp { + public: + explicit TransportStreamOp(grpc_transport_stream_op* op) + : op_(op), + send_initial_metadata_(op->send_initial_metadata), + send_trailing_metadata_(op->send_trailing_metadata), + recv_initial_metadata_(op->recv_initial_metadata), + recv_trailing_metadata_(op->recv_trailing_metadata) {} + + grpc_transport_stream_op* op() const { return op_; } + + grpc_closure* on_complete() const { return op_->on_complete; } + void set_on_complete(grpc_closure* closure) { + op_->on_complete = closure; + } + + MetadataBatch* send_initial_metadata() { + return op_->send_initial_metadata == nullptr + ? nullptr : &send_initial_metadata_; + } + MetadataBatch* send_trailing_metadata() { + return op_->send_trailing_metadata == nullptr + ? nullptr : &send_trailing_metadata_; + } + MetadataBatch* recv_initial_metadata() { + return op_->recv_initial_metadata == nullptr + ? nullptr : &recv_initial_metadata_; + } + MetadataBatch* recv_trailing_metadata() { + return op_->recv_trailing_metadata == nullptr + ? nullptr : &recv_trailing_metadata_; + } + + uint32_t* send_initial_metadata_flags() const { + return &op_->send_initial_metadata_flags; + } + + grpc_closure* recv_initial_metadata_ready() const { + return op_->recv_initial_metadata_ready; + } + void set_recv_initial_metadata_ready(grpc_closure* closure) { + op_->recv_initial_metadata_ready = closure; + } + + grpc_byte_stream* send_message() const { return op_->send_message; } + void set_send_message(grpc_byte_stream* send_message) { + op_->send_message = send_message; + } + + // To be called only on clients and servers, respectively. + grpc_client_security_context* client_security_context() const { + return (grpc_client_security_context*)op_->context[ + GRPC_CONTEXT_SECURITY].value; + } + grpc_server_security_context* server_security_context() const { + return (grpc_server_security_context*)op_->context[ + GRPC_CONTEXT_SECURITY].value; + } + + census_context* get_census_context() const { + return (census_context*)op_->context[GRPC_CONTEXT_TRACING].value; + } + + private: + grpc_transport_stream_op* op_; // Do not own. + MetadataBatch send_initial_metadata_; + MetadataBatch send_trailing_metadata_; + MetadataBatch recv_initial_metadata_; + MetadataBatch recv_trailing_metadata_; +}; + // Represents channel data. class ChannelData { public: virtual ~ChannelData() {} + const char* peer() const { return peer_; } + +// FIXME: find a way to avoid passing elem into these methods +// (same for CallData below) virtual void StartTransportOp(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_transport_op *op); - - const char* peer() const { return peer_; } + TransportOp *op); protected: ChannelData(const grpc_channel_args &args, const char *peer) : peer_(peer) {} @@ -79,7 +230,7 @@ class CallData { virtual void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op); + TransportStreamOp *op); virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, @@ -88,6 +239,8 @@ class CallData { virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); protected: +// FIXME: once PR #7024 has been merged, update this API to provide a +// way to return an error from call initialization explicit CallData(const ChannelData &) {} }; @@ -119,7 +272,8 @@ class ChannelFilter GRPC_FINAL { grpc_channel_element *elem, grpc_transport_op *op) { ChannelDataType *channel_data = (ChannelDataType *)elem->channel_data; - channel_data->StartTransportOp(exec_ctx, elem, op); + TransportOp op_wrapper(op); + channel_data->StartTransportOp(exec_ctx, elem, &op_wrapper); } static const size_t call_data_size = sizeof(CallDataType); @@ -143,7 +297,8 @@ class ChannelFilter GRPC_FINAL { grpc_call_element *elem, grpc_transport_stream_op *op) { CallDataType *call_data = (CallDataType *)elem->call_data; - call_data->StartTransportStreamOp(exec_ctx, elem, op); + TransportStreamOp op_wrapper(op); + call_data->StartTransportStreamOp(exec_ctx, elem, &op_wrapper); } static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index 6d82f4d681..2b0921c8d7 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -37,6 +37,10 @@ #include #include +#ifdef __cplusplus +extern "C" { +#endif + /* This file provides a mechanism for tracking metadata through the grpc stack. It's not intended for consumption outside of the library. @@ -164,4 +168,8 @@ void grpc_mdctx_global_shutdown(void); extern gpr_slice (*grpc_chttp2_base64_encode_and_huffman_compress)( gpr_slice input); +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_TRANSPORT_METADATA_H */ diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h index dad0cc5510..0424b4db98 100644 --- a/src/core/lib/transport/metadata_batch.h +++ b/src/core/lib/transport/metadata_batch.h @@ -42,6 +42,10 @@ #include #include "src/core/lib/transport/metadata.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct grpc_linked_mdelem { grpc_mdelem *md; struct grpc_linked_mdelem *next; @@ -127,4 +131,8 @@ void grpc_metadata_batch_assert_ok(grpc_metadata_batch *comd); } while (0) #endif +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_TRANSPORT_METADATA_BATCH_H */ diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index f473d63a25..b2fe3038f5 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -33,18 +33,33 @@ #include +#include + #include "src/core/lib/channel/channel_stack.h" namespace grpc { +// +// MetadataBatch +// + +grpc_linked_mdelem* MetadataBatch::AddMetadata( + const string& key, const string& value) { + grpc_linked_mdelem *storage = new grpc_linked_mdelem; + memset(storage, 0, sizeof(grpc_linked_mdelem)); + storage->md = grpc_mdelem_from_strings(key.c_str(), value.c_str()); + grpc_metadata_batch_link_head(batch_, storage); + return storage; +} + // // ChannelData // void ChannelData::StartTransportOp(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, - grpc_transport_op *op) { - grpc_channel_next_op(exec_ctx, elem, op); + TransportOp *op) { + grpc_channel_next_op(exec_ctx, elem, op->op()); } // @@ -53,8 +68,8 @@ void ChannelData::StartTransportOp(grpc_exec_ctx *exec_ctx, void CallData::StartTransportStreamOp(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op *op) { - grpc_call_next_op(exec_ctx, elem, op); + TransportStreamOp *op) { + grpc_call_next_op(exec_ctx, elem, op->op()); } void CallData::SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index b21d377d5d..dcaca10c7f 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -125,11 +125,11 @@ class CallDataImpl : public CallData { : CallData(channel_data) {} void StartTransportStreamOp(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_transport_stream_op* op) GRPC_OVERRIDE { + TransportStreamOp* op) GRPC_OVERRIDE { // Incrementing the counter could be done from the ctor, but we want // to test that the individual methods are actually called correctly. - if (op->recv_initial_metadata != nullptr) IncrementCallCounter(); - grpc_call_next_op(exec_ctx, elem, op); + if (op->recv_initial_metadata() != nullptr) IncrementCallCounter(); + grpc_call_next_op(exec_ctx, elem, op->op()); } }; diff --git a/third_party/protobuf b/third_party/protobuf index 3470b6895a..d4d13a4349 160000 --- a/third_party/protobuf +++ b/third_party/protobuf @@ -1 +1 @@ -Subproject commit 3470b6895aa659b7559ed678e029a5338e535f14 +Subproject commit d4d13a4349e4e59d67f311185ddcc1890d956d7a -- cgit v1.2.3 From bd3e3189d66b8f091e937380fb044aa3e19ca93e Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 29 Jun 2016 08:01:55 -0700 Subject: clang-format --- include/grpc++/channel_filter.h | 123 +++++++++++++++++++-------------------- src/cpp/common/channel_filter.cc | 4 +- 2 files changed, 62 insertions(+), 65 deletions(-) (limited to 'src') diff --git a/include/grpc++/channel_filter.h b/include/grpc++/channel_filter.h index b474722c34..e070307655 100644 --- a/include/grpc++/channel_filter.h +++ b/include/grpc++/channel_filter.h @@ -34,9 +34,9 @@ #ifndef GRPCXX_CHANNEL_FILTER_H #define GRPCXX_CHANNEL_FILTER_H -#include -#include #include +#include +#include #include #include @@ -60,22 +60,22 @@ namespace grpc { // A C++ wrapper for the grpc_metadata_batch struct. class MetadataBatch { public: - explicit MetadataBatch(grpc_metadata_batch* batch) : batch_(batch) {} + explicit MetadataBatch(grpc_metadata_batch *batch) : batch_(batch) {} - grpc_metadata_batch* batch() const { return batch_; } + grpc_metadata_batch *batch() const { return batch_; } // Adds metadata and returns the newly allocated storage. // The caller takes ownership of the result, which must exist for the // lifetime of the gRPC call. - grpc_linked_mdelem* AddMetadata(const string& key, const string& value); + grpc_linked_mdelem *AddMetadata(const string &key, const string &value); class const_iterator : public std::iterator { public: - const grpc_mdelem& operator*() const { return *elem_->md; } - const grpc_mdelem* operator->() const { return elem_->md; } + const grpc_mdelem &operator*() const { return *elem_->md; } + const grpc_mdelem *operator->() const { return elem_->md; } - const_iterator& operator++() { + const_iterator &operator++() { elem_ = elem_->next; return *this; } @@ -84,7 +84,7 @@ class MetadataBatch { operator++(); return tmp; } - const_iterator& operator--() { + const_iterator &operator--() { elem_ = elem_->prev; return *this; } @@ -94,36 +94,36 @@ class MetadataBatch { return tmp; } - bool operator==(const const_iterator& other) const { + bool operator==(const const_iterator &other) const { return elem_ == other.elem_; } - bool operator!=(const const_iterator& other) const { + bool operator!=(const const_iterator &other) const { return elem_ != other.elem_; } private: friend class MetadataBatch; - explicit const_iterator(grpc_linked_mdelem* elem) : elem_(elem) {} + explicit const_iterator(grpc_linked_mdelem *elem) : elem_(elem) {} - grpc_linked_mdelem* elem_; + grpc_linked_mdelem *elem_; }; const_iterator begin() const { return const_iterator(batch_->list.head); } const_iterator end() const { return const_iterator(nullptr); } private: - grpc_metadata_batch* batch_; + grpc_metadata_batch *batch_; }; // A C++ wrapper for the grpc_transport_op struct. class TransportOp { public: - explicit TransportOp(grpc_transport_op* op) : op_(op) {} + explicit TransportOp(grpc_transport_op *op) : op_(op) {} - grpc_transport_op* op() const { return op_; } + grpc_transport_op *op() const { return op_; } -// FIXME: add a C++ wrapper for grpc_error? - grpc_error* disconnect_with_error() const { + // FIXME: add a C++ wrapper for grpc_error? + grpc_error *disconnect_with_error() const { return op_->disconnect_with_error; } bool send_goaway() const { return op_->send_goaway; } @@ -131,75 +131,73 @@ class TransportOp { // TODO(roth): Add methods for additional fields as needed. private: - grpc_transport_op* op_; // Do not own. + grpc_transport_op *op_; // Do not own. }; // A C++ wrapper for the grpc_transport_stream_op struct. class TransportStreamOp { public: - explicit TransportStreamOp(grpc_transport_stream_op* op) + explicit TransportStreamOp(grpc_transport_stream_op *op) : op_(op), send_initial_metadata_(op->send_initial_metadata), send_trailing_metadata_(op->send_trailing_metadata), recv_initial_metadata_(op->recv_initial_metadata), recv_trailing_metadata_(op->recv_trailing_metadata) {} - grpc_transport_stream_op* op() const { return op_; } + grpc_transport_stream_op *op() const { return op_; } - grpc_closure* on_complete() const { return op_->on_complete; } - void set_on_complete(grpc_closure* closure) { - op_->on_complete = closure; - } + grpc_closure *on_complete() const { return op_->on_complete; } + void set_on_complete(grpc_closure *closure) { op_->on_complete = closure; } - MetadataBatch* send_initial_metadata() { - return op_->send_initial_metadata == nullptr - ? nullptr : &send_initial_metadata_; + MetadataBatch *send_initial_metadata() { + return op_->send_initial_metadata == nullptr ? nullptr + : &send_initial_metadata_; } - MetadataBatch* send_trailing_metadata() { - return op_->send_trailing_metadata == nullptr - ? nullptr : &send_trailing_metadata_; + MetadataBatch *send_trailing_metadata() { + return op_->send_trailing_metadata == nullptr ? nullptr + : &send_trailing_metadata_; } - MetadataBatch* recv_initial_metadata() { - return op_->recv_initial_metadata == nullptr - ? nullptr : &recv_initial_metadata_; + MetadataBatch *recv_initial_metadata() { + return op_->recv_initial_metadata == nullptr ? nullptr + : &recv_initial_metadata_; } - MetadataBatch* recv_trailing_metadata() { - return op_->recv_trailing_metadata == nullptr - ? nullptr : &recv_trailing_metadata_; + MetadataBatch *recv_trailing_metadata() { + return op_->recv_trailing_metadata == nullptr ? nullptr + : &recv_trailing_metadata_; } - uint32_t* send_initial_metadata_flags() const { + uint32_t *send_initial_metadata_flags() const { return &op_->send_initial_metadata_flags; } - grpc_closure* recv_initial_metadata_ready() const { + grpc_closure *recv_initial_metadata_ready() const { return op_->recv_initial_metadata_ready; } - void set_recv_initial_metadata_ready(grpc_closure* closure) { + void set_recv_initial_metadata_ready(grpc_closure *closure) { op_->recv_initial_metadata_ready = closure; } - grpc_byte_stream* send_message() const { return op_->send_message; } - void set_send_message(grpc_byte_stream* send_message) { + grpc_byte_stream *send_message() const { return op_->send_message; } + void set_send_message(grpc_byte_stream *send_message) { op_->send_message = send_message; } // To be called only on clients and servers, respectively. - grpc_client_security_context* client_security_context() const { - return (grpc_client_security_context*)op_->context[ - GRPC_CONTEXT_SECURITY].value; + grpc_client_security_context *client_security_context() const { + return (grpc_client_security_context *)op_->context[GRPC_CONTEXT_SECURITY] + .value; } - grpc_server_security_context* server_security_context() const { - return (grpc_server_security_context*)op_->context[ - GRPC_CONTEXT_SECURITY].value; + grpc_server_security_context *server_security_context() const { + return (grpc_server_security_context *)op_->context[GRPC_CONTEXT_SECURITY] + .value; } - census_context* get_census_context() const { - return (census_context*)op_->context[GRPC_CONTEXT_TRACING].value; + census_context *get_census_context() const { + return (census_context *)op_->context[GRPC_CONTEXT_TRACING].value; } private: - grpc_transport_stream_op* op_; // Do not own. + grpc_transport_stream_op *op_; // Do not own. MetadataBatch send_initial_metadata_; MetadataBatch send_trailing_metadata_; MetadataBatch recv_initial_metadata_; @@ -211,13 +209,12 @@ class ChannelData { public: virtual ~ChannelData() {} - const char* peer() const { return peer_; } + const char *peer() const { return peer_; } -// FIXME: find a way to avoid passing elem into these methods -// (same for CallData below) + // FIXME: find a way to avoid passing elem into these methods + // (same for CallData below) virtual void StartTransportOp(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - TransportOp *op); + grpc_channel_element *elem, TransportOp *op); protected: ChannelData(const grpc_channel_args &args, const char *peer) : peer_(peer) {} @@ -231,7 +228,7 @@ class CallData { public: virtual ~CallData() {} - virtual grpc_error* Init() { return GRPC_ERROR_NONE; } + virtual grpc_error *Init() { return GRPC_ERROR_NONE; } virtual void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, @@ -258,10 +255,10 @@ class ChannelFilter GRPC_FINAL { static void InitChannelElement(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { - const char* peer = args->optional_transport - ? grpc_transport_get_peer(exec_ctx, - args->optional_transport) - : nullptr; + const char *peer = + args->optional_transport + ? grpc_transport_get_peer(exec_ctx, args->optional_transport) + : nullptr; // Construct the object in the already-allocated memory. new (elem->channel_data) ChannelDataType(*args->channel_args, peer); } @@ -281,13 +278,13 @@ class ChannelFilter GRPC_FINAL { static const size_t call_data_size = sizeof(CallDataType); - static grpc_error* InitCallElement(grpc_exec_ctx *exec_ctx, + static grpc_error *InitCallElement(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { const ChannelDataType &channel_data = *(ChannelDataType *)elem->channel_data; // Construct the object in the already-allocated memory. - CallDataType* call_data = new (elem->call_data) CallDataType(channel_data); + CallDataType *call_data = new (elem->call_data) CallDataType(channel_data); return call_data->Init(); } diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index b2fe3038f5..ab43b8ac3c 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -43,8 +43,8 @@ namespace grpc { // MetadataBatch // -grpc_linked_mdelem* MetadataBatch::AddMetadata( - const string& key, const string& value) { +grpc_linked_mdelem *MetadataBatch::AddMetadata(const string &key, + const string &value) { grpc_linked_mdelem *storage = new grpc_linked_mdelem; memset(storage, 0, sizeof(grpc_linked_mdelem)); storage->md = grpc_mdelem_from_strings(key.c_str(), value.c_str()); -- cgit v1.2.3 From ab950ee7c5211cc8e1cfc47f5adf716496899c32 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 29 Jun 2016 14:51:53 -0700 Subject: Move channel_filter.h from include/ tree to src/ tree. --- BUILD | 4 +- Makefile | 3 +- build.yaml | 2 +- include/grpc++/channel_filter.h | 365 -------------------- src/cpp/common/channel_filter.cc | 3 +- src/cpp/common/channel_filter.h | 366 +++++++++++++++++++++ test/cpp/end2end/filter_end2end_test.cc | 2 +- tools/doxygen/Doxyfile.c++ | 1 - tools/doxygen/Doxyfile.c++.internal | 2 +- tools/run_tests/sources_and_headers.json | 4 +- vsprojects/vcxproj/grpc++/grpc++.vcxproj | 2 +- vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters | 6 +- .../grpc++_unsecure/grpc++_unsecure.vcxproj | 2 +- .../grpc++_unsecure.vcxproj.filters | 6 +- 14 files changed, 383 insertions(+), 385 deletions(-) delete mode 100644 include/grpc++/channel_filter.h create mode 100644 src/cpp/common/channel_filter.h (limited to 'src') diff --git a/BUILD b/BUILD index ce324887e3..86725b6f1c 100644 --- a/BUILD +++ b/BUILD @@ -1224,6 +1224,7 @@ cc_library( "src/cpp/common/secure_auth_context.h", "src/cpp/server/secure_server_credentials.h", "src/cpp/client/create_channel_internal.h", + "src/cpp/common/channel_filter.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h", "src/cpp/client/secure_credentials.cc", @@ -1264,7 +1265,6 @@ cc_library( hdrs = [ "include/grpc++/alarm.h", "include/grpc++/channel.h", - "include/grpc++/channel_filter.h", "include/grpc++/client_context.h", "include/grpc++/completion_queue.h", "include/grpc++/create_channel.h", @@ -1457,6 +1457,7 @@ cc_library( name = "grpc++_unsecure", srcs = [ "src/cpp/client/create_channel_internal.h", + "src/cpp/common/channel_filter.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h", "src/cpp/common/insecure_create_auth_context.cc", @@ -1492,7 +1493,6 @@ cc_library( hdrs = [ "include/grpc++/alarm.h", "include/grpc++/channel.h", - "include/grpc++/channel_filter.h", "include/grpc++/client_context.h", "include/grpc++/completion_queue.h", "include/grpc++/create_channel.h", diff --git a/Makefile b/Makefile index 1a4a303ad3..d2c263cc97 100644 --- a/Makefile +++ b/Makefile @@ -1489,6 +1489,7 @@ buildtests_cxx: buildtests_zookeeper privatelibs_cxx \ $(BINDIR)/$(CONFIG)/cxx_string_ref_test \ $(BINDIR)/$(CONFIG)/cxx_time_test \ $(BINDIR)/$(CONFIG)/end2end_test \ + $(BINDIR)/$(CONFIG)/filter_end2end_test \ $(BINDIR)/$(CONFIG)/generic_end2end_test \ $(BINDIR)/$(CONFIG)/golden_file_test \ $(BINDIR)/$(CONFIG)/grpc_cli \ @@ -3497,7 +3498,6 @@ LIBGRPC++_SRC = \ PUBLIC_HEADERS_CXX += \ include/grpc++/alarm.h \ include/grpc++/channel.h \ - include/grpc++/channel_filter.h \ include/grpc++/client_context.h \ include/grpc++/completion_queue.h \ include/grpc++/create_channel.h \ @@ -3986,7 +3986,6 @@ LIBGRPC++_UNSECURE_SRC = \ PUBLIC_HEADERS_CXX += \ include/grpc++/alarm.h \ include/grpc++/channel.h \ - include/grpc++/channel_filter.h \ include/grpc++/client_context.h \ include/grpc++/completion_queue.h \ include/grpc++/create_channel.h \ diff --git a/build.yaml b/build.yaml index b1a11d503d..0a78e52775 100644 --- a/build.yaml +++ b/build.yaml @@ -638,7 +638,6 @@ filegroups: public_headers: - include/grpc++/alarm.h - include/grpc++/channel.h - - include/grpc++/channel_filter.h - include/grpc++/client_context.h - include/grpc++/completion_queue.h - include/grpc++/create_channel.h @@ -686,6 +685,7 @@ filegroups: - include/grpc++/support/time.h headers: - src/cpp/client/create_channel_internal.h + - src/cpp/common/channel_filter.h - src/cpp/server/dynamic_thread_pool.h - src/cpp/server/thread_pool_interface.h src: diff --git a/include/grpc++/channel_filter.h b/include/grpc++/channel_filter.h deleted file mode 100644 index e070307655..0000000000 --- a/include/grpc++/channel_filter.h +++ /dev/null @@ -1,365 +0,0 @@ -/* - * - * Copyright 2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPCXX_CHANNEL_FILTER_H -#define GRPCXX_CHANNEL_FILTER_H - -#include -#include -#include - -#include -#include - -#include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/security/context/security_context.h" -#include "src/core/lib/surface/channel_init.h" -#include "src/core/lib/transport/metadata_batch.h" - -// -// An interface to define filters. -// -// To define a filter, implement a subclass of each of CallData and -// ChannelData. Then register the filter using something like this: -// RegisterChannelFilter( -// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); -// - -namespace grpc { - -// A C++ wrapper for the grpc_metadata_batch struct. -class MetadataBatch { - public: - explicit MetadataBatch(grpc_metadata_batch *batch) : batch_(batch) {} - - grpc_metadata_batch *batch() const { return batch_; } - - // Adds metadata and returns the newly allocated storage. - // The caller takes ownership of the result, which must exist for the - // lifetime of the gRPC call. - grpc_linked_mdelem *AddMetadata(const string &key, const string &value); - - class const_iterator : public std::iterator { - public: - const grpc_mdelem &operator*() const { return *elem_->md; } - const grpc_mdelem *operator->() const { return elem_->md; } - - const_iterator &operator++() { - elem_ = elem_->next; - return *this; - } - const_iterator operator++(int) { - const_iterator tmp(*this); - operator++(); - return tmp; - } - const_iterator &operator--() { - elem_ = elem_->prev; - return *this; - } - const_iterator operator--(int) { - const_iterator tmp(*this); - operator--(); - return tmp; - } - - bool operator==(const const_iterator &other) const { - return elem_ == other.elem_; - } - bool operator!=(const const_iterator &other) const { - return elem_ != other.elem_; - } - - private: - friend class MetadataBatch; - explicit const_iterator(grpc_linked_mdelem *elem) : elem_(elem) {} - - grpc_linked_mdelem *elem_; - }; - - const_iterator begin() const { return const_iterator(batch_->list.head); } - const_iterator end() const { return const_iterator(nullptr); } - - private: - grpc_metadata_batch *batch_; -}; - -// A C++ wrapper for the grpc_transport_op struct. -class TransportOp { - public: - explicit TransportOp(grpc_transport_op *op) : op_(op) {} - - grpc_transport_op *op() const { return op_; } - - // FIXME: add a C++ wrapper for grpc_error? - grpc_error *disconnect_with_error() const { - return op_->disconnect_with_error; - } - bool send_goaway() const { return op_->send_goaway; } - - // TODO(roth): Add methods for additional fields as needed. - - private: - grpc_transport_op *op_; // Do not own. -}; - -// A C++ wrapper for the grpc_transport_stream_op struct. -class TransportStreamOp { - public: - explicit TransportStreamOp(grpc_transport_stream_op *op) - : op_(op), - send_initial_metadata_(op->send_initial_metadata), - send_trailing_metadata_(op->send_trailing_metadata), - recv_initial_metadata_(op->recv_initial_metadata), - recv_trailing_metadata_(op->recv_trailing_metadata) {} - - grpc_transport_stream_op *op() const { return op_; } - - grpc_closure *on_complete() const { return op_->on_complete; } - void set_on_complete(grpc_closure *closure) { op_->on_complete = closure; } - - MetadataBatch *send_initial_metadata() { - return op_->send_initial_metadata == nullptr ? nullptr - : &send_initial_metadata_; - } - MetadataBatch *send_trailing_metadata() { - return op_->send_trailing_metadata == nullptr ? nullptr - : &send_trailing_metadata_; - } - MetadataBatch *recv_initial_metadata() { - return op_->recv_initial_metadata == nullptr ? nullptr - : &recv_initial_metadata_; - } - MetadataBatch *recv_trailing_metadata() { - return op_->recv_trailing_metadata == nullptr ? nullptr - : &recv_trailing_metadata_; - } - - uint32_t *send_initial_metadata_flags() const { - return &op_->send_initial_metadata_flags; - } - - grpc_closure *recv_initial_metadata_ready() const { - return op_->recv_initial_metadata_ready; - } - void set_recv_initial_metadata_ready(grpc_closure *closure) { - op_->recv_initial_metadata_ready = closure; - } - - grpc_byte_stream *send_message() const { return op_->send_message; } - void set_send_message(grpc_byte_stream *send_message) { - op_->send_message = send_message; - } - - // To be called only on clients and servers, respectively. - grpc_client_security_context *client_security_context() const { - return (grpc_client_security_context *)op_->context[GRPC_CONTEXT_SECURITY] - .value; - } - grpc_server_security_context *server_security_context() const { - return (grpc_server_security_context *)op_->context[GRPC_CONTEXT_SECURITY] - .value; - } - - census_context *get_census_context() const { - return (census_context *)op_->context[GRPC_CONTEXT_TRACING].value; - } - - private: - grpc_transport_stream_op *op_; // Do not own. - MetadataBatch send_initial_metadata_; - MetadataBatch send_trailing_metadata_; - MetadataBatch recv_initial_metadata_; - MetadataBatch recv_trailing_metadata_; -}; - -// Represents channel data. -class ChannelData { - public: - virtual ~ChannelData() {} - - const char *peer() const { return peer_; } - - // FIXME: find a way to avoid passing elem into these methods - // (same for CallData below) - virtual void StartTransportOp(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, TransportOp *op); - - protected: - ChannelData(const grpc_channel_args &args, const char *peer) : peer_(peer) {} - - private: - const char *peer_; // Do not own. -}; - -// Represents call data. -class CallData { - public: - virtual ~CallData() {} - - virtual grpc_error *Init() { return GRPC_ERROR_NONE; } - - virtual void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - TransportStreamOp *op); - - virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_polling_entity *pollent); - - virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); - - protected: - explicit CallData(const ChannelData &) {} -}; - -namespace internal { - -// Defines static members for passing to C core. -template -class ChannelFilter GRPC_FINAL { - public: - static const size_t channel_data_size = sizeof(ChannelDataType); - - static void InitChannelElement(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_channel_element_args *args) { - const char *peer = - args->optional_transport - ? grpc_transport_get_peer(exec_ctx, args->optional_transport) - : nullptr; - // Construct the object in the already-allocated memory. - new (elem->channel_data) ChannelDataType(*args->channel_args, peer); - } - - static void DestroyChannelElement(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem) { - reinterpret_cast(elem->channel_data)->~ChannelDataType(); - } - - static void StartTransportOp(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_transport_op *op) { - ChannelDataType *channel_data = (ChannelDataType *)elem->channel_data; - TransportOp op_wrapper(op); - channel_data->StartTransportOp(exec_ctx, elem, &op_wrapper); - } - - static const size_t call_data_size = sizeof(CallDataType); - - static grpc_error *InitCallElement(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_call_element_args *args) { - const ChannelDataType &channel_data = - *(ChannelDataType *)elem->channel_data; - // Construct the object in the already-allocated memory. - CallDataType *call_data = new (elem->call_data) CallDataType(channel_data); - return call_data->Init(); - } - - static void DestroyCallElement(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - const grpc_call_stats *stats, - void *and_free_memory) { - reinterpret_cast(elem->call_data)->~CallDataType(); - } - - static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - CallDataType *call_data = (CallDataType *)elem->call_data; - TransportStreamOp op_wrapper(op); - call_data->StartTransportStreamOp(exec_ctx, elem, &op_wrapper); - } - - static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_polling_entity *pollent) { - CallDataType *call_data = (CallDataType *)elem->call_data; - call_data->SetPollsetOrPollsetSet(exec_ctx, elem, pollent); - } - - static char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - CallDataType *call_data = (CallDataType *)elem->call_data; - return call_data->GetPeer(exec_ctx, elem); - } -}; - -struct FilterRecord { - grpc_channel_stack_type stack_type; - int priority; - std::function include_filter; - grpc_channel_filter filter; -}; -extern std::vector *channel_filters; - -void ChannelFilterPluginInit(); -void ChannelFilterPluginShutdown(); - -} // namespace internal - -// Registers a new filter. -// Must be called by only one thread at a time. -// The include_filter argument specifies a function that will be called -// to determine at run-time whether or not to add the filter. If the -// value is nullptr, the filter will be added unconditionally. -template -void RegisterChannelFilter( - const char *name, grpc_channel_stack_type stack_type, int priority, - std::function include_filter) { - // If we haven't been called before, initialize channel_filters and - // call grpc_register_plugin(). - if (internal::channel_filters == nullptr) { - grpc_register_plugin(internal::ChannelFilterPluginInit, - internal::ChannelFilterPluginShutdown); - internal::channel_filters = new std::vector(); - } - // Add an entry to channel_filters. The filter will be added when the - // C-core initialization code calls ChannelFilterPluginInit(). - typedef internal::ChannelFilter FilterType; - internal::FilterRecord filter_record = { - stack_type, - priority, - include_filter, - {FilterType::StartTransportStreamOp, FilterType::StartTransportOp, - FilterType::call_data_size, FilterType::InitCallElement, - FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement, - FilterType::channel_data_size, FilterType::InitChannelElement, - FilterType::DestroyChannelElement, FilterType::GetPeer, name}}; - internal::channel_filters->push_back(filter_record); -} - -} // namespace grpc - -#endif // GRPCXX_CHANNEL_FILTER_H diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index ab43b8ac3c..8a4149bbca 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -31,11 +31,10 @@ * */ -#include - #include #include "src/core/lib/channel/channel_stack.h" +#include "src/cpp/common/channel_filter.h" namespace grpc { diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h new file mode 100644 index 0000000000..437c7a2759 --- /dev/null +++ b/src/cpp/common/channel_filter.h @@ -0,0 +1,366 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPCXX_CHANNEL_FILTER_H +#define GRPCXX_CHANNEL_FILTER_H + +#include +#include +#include + +#include +#include + +#include "src/core/lib/channel/channel_stack.h" +#include "src/core/lib/security/context/security_context.h" +#include "src/core/lib/surface/channel_init.h" +#include "src/core/lib/transport/metadata_batch.h" + +// +// An interface to define filters. +// +// To define a filter, implement a subclass of each of CallData and +// ChannelData. Then register the filter using something like this: +// RegisterChannelFilter( +// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); +// + +namespace grpc { + +// A C++ wrapper for the grpc_metadata_batch struct. +class MetadataBatch { + public: + explicit MetadataBatch(grpc_metadata_batch *batch) : batch_(batch) {} + + grpc_metadata_batch *batch() const { return batch_; } + + // Adds metadata and returns the newly allocated storage. + // The caller takes ownership of the result, which must exist for the + // lifetime of the gRPC call. + grpc_linked_mdelem *AddMetadata(const string &key, const string &value); + + class const_iterator : public std::iterator { + public: + const grpc_mdelem &operator*() const { return *elem_->md; } + const grpc_mdelem *operator->() const { return elem_->md; } + + const_iterator &operator++() { + elem_ = elem_->next; + return *this; + } + const_iterator operator++(int) { + const_iterator tmp(*this); + operator++(); + return tmp; + } + const_iterator &operator--() { + elem_ = elem_->prev; + return *this; + } + const_iterator operator--(int) { + const_iterator tmp(*this); + operator--(); + return tmp; + } + + bool operator==(const const_iterator &other) const { + return elem_ == other.elem_; + } + bool operator!=(const const_iterator &other) const { + return elem_ != other.elem_; + } + + private: + friend class MetadataBatch; + explicit const_iterator(grpc_linked_mdelem *elem) : elem_(elem) {} + + grpc_linked_mdelem *elem_; + }; + + const_iterator begin() const { return const_iterator(batch_->list.head); } + const_iterator end() const { return const_iterator(nullptr); } + + private: + grpc_metadata_batch *batch_; +}; + +// A C++ wrapper for the grpc_transport_op struct. +class TransportOp { + public: + explicit TransportOp(grpc_transport_op *op) : op_(op) {} + + grpc_transport_op *op() const { return op_; } + + // TODO(roth): Add a C++ wrapper for grpc_error? + grpc_error *disconnect_with_error() const { + return op_->disconnect_with_error; + } + bool send_goaway() const { return op_->send_goaway; } + + // TODO(roth): Add methods for additional fields as needed. + + private: + grpc_transport_op *op_; // Do not own. +}; + +// A C++ wrapper for the grpc_transport_stream_op struct. +class TransportStreamOp { + public: + explicit TransportStreamOp(grpc_transport_stream_op *op) + : op_(op), + send_initial_metadata_(op->send_initial_metadata), + send_trailing_metadata_(op->send_trailing_metadata), + recv_initial_metadata_(op->recv_initial_metadata), + recv_trailing_metadata_(op->recv_trailing_metadata) {} + + grpc_transport_stream_op *op() const { return op_; } + + grpc_closure *on_complete() const { return op_->on_complete; } + void set_on_complete(grpc_closure *closure) { op_->on_complete = closure; } + + MetadataBatch *send_initial_metadata() { + return op_->send_initial_metadata == nullptr ? nullptr + : &send_initial_metadata_; + } + MetadataBatch *send_trailing_metadata() { + return op_->send_trailing_metadata == nullptr ? nullptr + : &send_trailing_metadata_; + } + MetadataBatch *recv_initial_metadata() { + return op_->recv_initial_metadata == nullptr ? nullptr + : &recv_initial_metadata_; + } + MetadataBatch *recv_trailing_metadata() { + return op_->recv_trailing_metadata == nullptr ? nullptr + : &recv_trailing_metadata_; + } + + uint32_t *send_initial_metadata_flags() const { + return &op_->send_initial_metadata_flags; + } + + grpc_closure *recv_initial_metadata_ready() const { + return op_->recv_initial_metadata_ready; + } + void set_recv_initial_metadata_ready(grpc_closure *closure) { + op_->recv_initial_metadata_ready = closure; + } + + grpc_byte_stream *send_message() const { return op_->send_message; } + void set_send_message(grpc_byte_stream *send_message) { + op_->send_message = send_message; + } + + // To be called only on clients and servers, respectively. + grpc_client_security_context *client_security_context() const { + return (grpc_client_security_context *)op_->context[GRPC_CONTEXT_SECURITY] + .value; + } + grpc_server_security_context *server_security_context() const { + return (grpc_server_security_context *)op_->context[GRPC_CONTEXT_SECURITY] + .value; + } + + census_context *get_census_context() const { + return (census_context *)op_->context[GRPC_CONTEXT_TRACING].value; + } + + private: + grpc_transport_stream_op *op_; // Do not own. + MetadataBatch send_initial_metadata_; + MetadataBatch send_trailing_metadata_; + MetadataBatch recv_initial_metadata_; + MetadataBatch recv_trailing_metadata_; +}; + +// Represents channel data. +class ChannelData { + public: + virtual ~ChannelData() {} + + const char *peer() const { return peer_; } + + // TODO(roth): Find a way to avoid passing elem into these methods. + virtual void StartTransportOp(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, TransportOp *op); + + protected: + ChannelData(const grpc_channel_args &args, const char *peer) : peer_(peer) {} + + private: + const char *peer_; // Do not own. +}; + +// Represents call data. +class CallData { + public: + virtual ~CallData() {} + + virtual grpc_error *Init() { return GRPC_ERROR_NONE; } + + // TODO(roth): Find a way to avoid passing elem into these methods. + + virtual void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + TransportStreamOp *op); + + virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_polling_entity *pollent); + + virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + + protected: + explicit CallData(const ChannelData &) {} +}; + +namespace internal { + +// Defines static members for passing to C core. +template +class ChannelFilter GRPC_FINAL { + public: + static const size_t channel_data_size = sizeof(ChannelDataType); + + static void InitChannelElement(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + const char *peer = + args->optional_transport + ? grpc_transport_get_peer(exec_ctx, args->optional_transport) + : nullptr; + // Construct the object in the already-allocated memory. + new (elem->channel_data) ChannelDataType(*args->channel_args, peer); + } + + static void DestroyChannelElement(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { + reinterpret_cast(elem->channel_data)->~ChannelDataType(); + } + + static void StartTransportOp(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + ChannelDataType *channel_data = (ChannelDataType *)elem->channel_data; + TransportOp op_wrapper(op); + channel_data->StartTransportOp(exec_ctx, elem, &op_wrapper); + } + + static const size_t call_data_size = sizeof(CallDataType); + + static grpc_error *InitCallElement(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_call_element_args *args) { + const ChannelDataType &channel_data = + *(ChannelDataType *)elem->channel_data; + // Construct the object in the already-allocated memory. + CallDataType *call_data = new (elem->call_data) CallDataType(channel_data); + return call_data->Init(); + } + + static void DestroyCallElement(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_stats *stats, + void *and_free_memory) { + reinterpret_cast(elem->call_data)->~CallDataType(); + } + + static void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + CallDataType *call_data = (CallDataType *)elem->call_data; + TransportStreamOp op_wrapper(op); + call_data->StartTransportStreamOp(exec_ctx, elem, &op_wrapper); + } + + static void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_polling_entity *pollent) { + CallDataType *call_data = (CallDataType *)elem->call_data; + call_data->SetPollsetOrPollsetSet(exec_ctx, elem, pollent); + } + + static char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { + CallDataType *call_data = (CallDataType *)elem->call_data; + return call_data->GetPeer(exec_ctx, elem); + } +}; + +struct FilterRecord { + grpc_channel_stack_type stack_type; + int priority; + std::function include_filter; + grpc_channel_filter filter; +}; +extern std::vector *channel_filters; + +void ChannelFilterPluginInit(); +void ChannelFilterPluginShutdown(); + +} // namespace internal + +// Registers a new filter. +// Must be called by only one thread at a time. +// The include_filter argument specifies a function that will be called +// to determine at run-time whether or not to add the filter. If the +// value is nullptr, the filter will be added unconditionally. +template +void RegisterChannelFilter( + const char *name, grpc_channel_stack_type stack_type, int priority, + std::function include_filter) { + // If we haven't been called before, initialize channel_filters and + // call grpc_register_plugin(). + if (internal::channel_filters == nullptr) { + grpc_register_plugin(internal::ChannelFilterPluginInit, + internal::ChannelFilterPluginShutdown); + internal::channel_filters = new std::vector(); + } + // Add an entry to channel_filters. The filter will be added when the + // C-core initialization code calls ChannelFilterPluginInit(). + typedef internal::ChannelFilter FilterType; + internal::FilterRecord filter_record = { + stack_type, + priority, + include_filter, + {FilterType::StartTransportStreamOp, FilterType::StartTransportOp, + FilterType::call_data_size, FilterType::InitCallElement, + FilterType::SetPollsetOrPollsetSet, FilterType::DestroyCallElement, + FilterType::channel_data_size, FilterType::InitChannelElement, + FilterType::DestroyChannelElement, FilterType::GetPeer, name}}; + internal::channel_filters->push_back(filter_record); +} + +} // namespace grpc + +#endif // GRPCXX_CHANNEL_FILTER_H diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc index dcaca10c7f..576d440c9b 100644 --- a/test/cpp/end2end/filter_end2end_test.cc +++ b/test/cpp/end2end/filter_end2end_test.cc @@ -35,7 +35,6 @@ #include #include -#include #include #include #include @@ -50,6 +49,7 @@ #include #include +#include "src/cpp/common/channel_filter.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" diff --git a/tools/doxygen/Doxyfile.c++ b/tools/doxygen/Doxyfile.c++ index e770574cb1..7f9d2df6f6 100644 --- a/tools/doxygen/Doxyfile.c++ +++ b/tools/doxygen/Doxyfile.c++ @@ -762,7 +762,6 @@ WARN_LOGFILE = INPUT = include/grpc++/alarm.h \ include/grpc++/channel.h \ -include/grpc++/channel_filter.h \ include/grpc++/client_context.h \ include/grpc++/completion_queue.h \ include/grpc++/create_channel.h \ diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index a3c4a10926..3c81c48d4a 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -762,7 +762,6 @@ WARN_LOGFILE = INPUT = include/grpc++/alarm.h \ include/grpc++/channel.h \ -include/grpc++/channel_filter.h \ include/grpc++/client_context.h \ include/grpc++/completion_queue.h \ include/grpc++/create_channel.h \ @@ -864,6 +863,7 @@ src/cpp/client/secure_credentials.h \ src/cpp/common/secure_auth_context.h \ src/cpp/server/secure_server_credentials.h \ src/cpp/client/create_channel_internal.h \ +src/cpp/common/channel_filter.h \ src/cpp/server/dynamic_thread_pool.h \ src/cpp/server/thread_pool_interface.h \ src/cpp/client/secure_credentials.cc \ diff --git a/tools/run_tests/sources_and_headers.json b/tools/run_tests/sources_and_headers.json index 7d16d15eb4..e55d8a0997 100644 --- a/tools/run_tests/sources_and_headers.json +++ b/tools/run_tests/sources_and_headers.json @@ -6537,7 +6537,6 @@ "headers": [ "include/grpc++/alarm.h", "include/grpc++/channel.h", - "include/grpc++/channel_filter.h", "include/grpc++/client_context.h", "include/grpc++/completion_queue.h", "include/grpc++/create_channel.h", @@ -6584,6 +6583,7 @@ "include/grpc++/support/sync_stream.h", "include/grpc++/support/time.h", "src/cpp/client/create_channel_internal.h", + "src/cpp/common/channel_filter.h", "src/cpp/server/dynamic_thread_pool.h", "src/cpp/server/thread_pool_interface.h" ], @@ -6592,7 +6592,6 @@ "src": [ "include/grpc++/alarm.h", "include/grpc++/channel.h", - "include/grpc++/channel_filter.h", "include/grpc++/client_context.h", "include/grpc++/completion_queue.h", "include/grpc++/create_channel.h", @@ -6649,6 +6648,7 @@ "src/cpp/client/insecure_credentials.cc", "src/cpp/common/channel_arguments.cc", "src/cpp/common/channel_filter.cc", + "src/cpp/common/channel_filter.h", "src/cpp/common/completion_queue.cc", "src/cpp/common/core_codegen.cc", "src/cpp/common/rpc_method.cc", diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj b/vsprojects/vcxproj/grpc++/grpc++.vcxproj index b882c302bb..835e2527c9 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj @@ -260,7 +260,6 @@ - @@ -364,6 +363,7 @@ + diff --git a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters index 08fffb74b2..883e66e1df 100644 --- a/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++/grpc++.vcxproj.filters @@ -111,9 +111,6 @@ include\grpc++ - - include\grpc++ - include\grpc++ @@ -419,6 +416,9 @@ src\cpp\client + + src\cpp\common + src\cpp\server diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj index b5a27f624d..e71180feb0 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj @@ -260,7 +260,6 @@ - @@ -360,6 +359,7 @@ + diff --git a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters index 68d9a47973..a9aa147e56 100644 --- a/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters +++ b/vsprojects/vcxproj/grpc++_unsecure/grpc++_unsecure.vcxproj.filters @@ -96,9 +96,6 @@ include\grpc++ - - include\grpc++ - include\grpc++ @@ -392,6 +389,9 @@ src\cpp\client + + src\cpp\common + src\cpp\server -- cgit v1.2.3 From 97b173dfb8d10bc68dcffa135762d2d152723bc6 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 29 Jun 2016 15:07:35 -0700 Subject: Addressed reviewer comments. --- src/cpp/common/channel_filter.h | 55 +++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index 437c7a2759..f3cbdb6224 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -37,6 +37,7 @@ #include #include #include +#include #include #include @@ -46,18 +47,20 @@ #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/metadata_batch.h" -// -// An interface to define filters. -// -// To define a filter, implement a subclass of each of CallData and -// ChannelData. Then register the filter using something like this: -// RegisterChannelFilter( -// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); -// +/// +/// An interface to define filters. +/// +/// To define a filter, implement a subclass of each of \c CallData and +/// \c ChannelData. Then register the filter using something like this: +/// \code{.cpp} +/// RegisterChannelFilter( +/// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); +/// \endcode +/// namespace grpc { -// A C++ wrapper for the grpc_metadata_batch struct. +/// A C++ wrapper for the \c grpc_metadata_batch struct. class MetadataBatch { public: explicit MetadataBatch(grpc_metadata_batch *batch) : batch_(batch) {} @@ -112,10 +115,10 @@ class MetadataBatch { const_iterator end() const { return const_iterator(nullptr); } private: - grpc_metadata_batch *batch_; + grpc_metadata_batch *batch_; // Not owned. }; -// A C++ wrapper for the grpc_transport_op struct. +/// A C++ wrapper for the \c grpc_transport_op struct. class TransportOp { public: explicit TransportOp(grpc_transport_op *op) : op_(op) {} @@ -131,10 +134,10 @@ class TransportOp { // TODO(roth): Add methods for additional fields as needed. private: - grpc_transport_op *op_; // Do not own. + grpc_transport_op *op_; // Not owned. }; -// A C++ wrapper for the grpc_transport_stream_op struct. +/// A C++ wrapper for the \c grpc_transport_stream_op struct. class TransportStreamOp { public: explicit TransportStreamOp(grpc_transport_stream_op *op) @@ -197,18 +200,21 @@ class TransportStreamOp { } private: - grpc_transport_stream_op *op_; // Do not own. + grpc_transport_stream_op *op_; // Not owned. MetadataBatch send_initial_metadata_; MetadataBatch send_trailing_metadata_; MetadataBatch recv_initial_metadata_; MetadataBatch recv_trailing_metadata_; }; -// Represents channel data. +/// Represents channel data. class ChannelData { public: - virtual ~ChannelData() {} + virtual ~ChannelData() { + if (peer_) gpr_free((void *)peer_); + } + // Caller does NOT take ownership of result. const char *peer() const { return peer_; } // TODO(roth): Find a way to avoid passing elem into these methods. @@ -216,13 +222,14 @@ class ChannelData { grpc_channel_element *elem, TransportOp *op); protected: + /// Takes ownership of \a peer. ChannelData(const grpc_channel_args &args, const char *peer) : peer_(peer) {} private: - const char *peer_; // Do not own. + const char *peer_; }; -// Represents call data. +/// Represents call data. class CallData { public: virtual ~CallData() {} @@ -330,11 +337,11 @@ void ChannelFilterPluginShutdown(); } // namespace internal -// Registers a new filter. -// Must be called by only one thread at a time. -// The include_filter argument specifies a function that will be called -// to determine at run-time whether or not to add the filter. If the -// value is nullptr, the filter will be added unconditionally. +/// Registers a new filter. +/// Must be called by only one thread at a time. +/// The \a include_filter argument specifies a function that will be called +/// to determine at run-time whether or not to add the filter. If the +/// value is nullptr, the filter will be added unconditionally. template void RegisterChannelFilter( const char *name, grpc_channel_stack_type stack_type, int priority, @@ -346,7 +353,7 @@ void RegisterChannelFilter( internal::ChannelFilterPluginShutdown); internal::channel_filters = new std::vector(); } - // Add an entry to channel_filters. The filter will be added when the + // Add an entry to channel_filters. The filter will be added when the // C-core initialization code calls ChannelFilterPluginInit(). typedef internal::ChannelFilter FilterType; internal::FilterRecord filter_record = { -- cgit v1.2.3 From f9d8e9064ba9aa20c944441f4890866ff3b98141 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Thu, 28 Jul 2016 09:31:56 -0700 Subject: Code review changes. --- src/core/lib/channel/context.h | 14 +++++++------- src/cpp/common/channel_filter.cc | 10 +--------- src/cpp/common/channel_filter.h | 27 ++++++++++++++++++++------- 3 files changed, 28 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index 22f4cb62f3..071c5f695c 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -34,17 +34,17 @@ #ifndef GRPC_CORE_LIB_CHANNEL_CONTEXT_H #define GRPC_CORE_LIB_CHANNEL_CONTEXT_H -// Call object context pointers. +/// Call object context pointers. -// Call context is represented as an array of grpc_call_context_elements. -// This enum represents the indexes into the array, where each index -// contains a different type of value. +/// Call context is represented as an array of \a grpc_call_context_elements. +/// This enum represents the indexes into the array, where each index +/// contains a different type of value. typedef enum { - // Value is either a grpc_client_security_context or a - // grpc_server_security_context. + /// Value is either a \a grpc_client_security_context or a + /// \a grpc_server_security_context. GRPC_CONTEXT_SECURITY = 0, - // Value is a census_context. + /// Value is a \a census_context. GRPC_CONTEXT_TRACING, GRPC_CONTEXT_COUNT diff --git a/src/cpp/common/channel_filter.cc b/src/cpp/common/channel_filter.cc index 8a4149bbca..25cd49cb7c 100644 --- a/src/cpp/common/channel_filter.cc +++ b/src/cpp/common/channel_filter.cc @@ -38,9 +38,7 @@ namespace grpc { -// // MetadataBatch -// grpc_linked_mdelem *MetadataBatch::AddMetadata(const string &key, const string &value) { @@ -51,9 +49,7 @@ grpc_linked_mdelem *MetadataBatch::AddMetadata(const string &key, return storage; } -// // ChannelData -// void ChannelData::StartTransportOp(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -61,9 +57,7 @@ void ChannelData::StartTransportOp(grpc_exec_ctx *exec_ctx, grpc_channel_next_op(exec_ctx, elem, op->op()); } -// // CallData -// void CallData::StartTransportStreamOp(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, @@ -81,9 +75,7 @@ char *CallData::GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { return grpc_call_next_get_peer(exec_ctx, elem); } -// -// RegisterChannelFilter() -// +// internal code used by RegisterChannelFilter() namespace internal { diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index f3cbdb6224..6a405f567c 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -47,7 +47,6 @@ #include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/metadata_batch.h" -/// /// An interface to define filters. /// /// To define a filter, implement a subclass of each of \c CallData and @@ -56,20 +55,22 @@ /// RegisterChannelFilter( /// "name-of-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr); /// \endcode -/// namespace grpc { /// A C++ wrapper for the \c grpc_metadata_batch struct. class MetadataBatch { public: + /// Borrows a pointer to \a batch, but does NOT take ownership. + /// The caller must ensure that \a batch continues to exist for as + /// long as the MetadataBatch object does. explicit MetadataBatch(grpc_metadata_batch *batch) : batch_(batch) {} grpc_metadata_batch *batch() const { return batch_; } - // Adds metadata and returns the newly allocated storage. - // The caller takes ownership of the result, which must exist for the - // lifetime of the gRPC call. + /// Adds metadata and returns the newly allocated storage. + /// The caller takes ownership of the result, which must exist for the + /// lifetime of the gRPC call. grpc_linked_mdelem *AddMetadata(const string &key, const string &value); class const_iterator : public std::iteratorsend_initial_metadata), @@ -185,7 +192,7 @@ class TransportStreamOp { op_->send_message = send_message; } - // To be called only on clients and servers, respectively. + /// To be called only on clients and servers, respectively. grpc_client_security_context *client_security_context() const { return (grpc_client_security_context *)op_->context[GRPC_CONTEXT_SECURITY] .value; @@ -214,7 +221,7 @@ class ChannelData { if (peer_) gpr_free((void *)peer_); } - // Caller does NOT take ownership of result. + /// Caller does NOT take ownership of result. const char *peer() const { return peer_; } // TODO(roth): Find a way to avoid passing elem into these methods. @@ -234,18 +241,22 @@ class CallData { public: virtual ~CallData() {} + /// Initializes the call data. virtual grpc_error *Init() { return GRPC_ERROR_NONE; } // TODO(roth): Find a way to avoid passing elem into these methods. + /// Starts a new stream operation. virtual void StartTransportStreamOp(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, TransportStreamOp *op); + /// Sets a pollset or pollset set. virtual void SetPollsetOrPollsetSet(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_polling_entity *pollent); + /// Gets the peer name. virtual char *GetPeer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); protected: @@ -255,6 +266,8 @@ class CallData { namespace internal { // Defines static members for passing to C core. +// Members of this class correspond to the members of the C +// grpc_channel_filter struct. template class ChannelFilter GRPC_FINAL { public: -- cgit v1.2.3 From 3b4f99549db1ef93b67c4fca97d1ed84c1cd0825 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Tue, 2 Aug 2016 14:17:11 -0700 Subject: Update destroy_call_elem args from merge. --- src/cpp/common/channel_filter.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index 6a405f567c..b7861b40f0 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -311,7 +311,7 @@ class ChannelFilter GRPC_FINAL { static void DestroyCallElement(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const grpc_call_stats *stats, + const grpc_call_final_info* final_info, void *and_free_memory) { reinterpret_cast(elem->call_data)->~CallDataType(); } -- cgit v1.2.3 From 00e9c3bb3bfd6ef556dbee92ba982ea71238af40 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 3 Aug 2016 10:48:30 -0700 Subject: clang-format --- src/cpp/common/channel_filter.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/cpp/common/channel_filter.h b/src/cpp/common/channel_filter.h index b7861b40f0..3fbacd824c 100644 --- a/src/cpp/common/channel_filter.h +++ b/src/cpp/common/channel_filter.h @@ -311,7 +311,7 @@ class ChannelFilter GRPC_FINAL { static void DestroyCallElement(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const grpc_call_final_info* final_info, + const grpc_call_final_info *final_info, void *and_free_memory) { reinterpret_cast(elem->call_data)->~CallDataType(); } -- cgit v1.2.3 From de84d566b8fad6808e5263a25a17fa231cb5713c Mon Sep 17 00:00:00 2001 From: siddharthshukla Date: Wed, 3 Aug 2016 17:55:10 +0200 Subject: Fix the ThreadPoolExecutor: max_workers can't be 0 Add a RecordingThreadPool that inherits from Executor, contains a ThreadPoolExecutor and has an extra method 'was_used' to indicate if submit method was ever called i.e. if the thread pool was ever used. --- .../tests/unit/_channel_connectivity_test.py | 10 +++-- .../tests/unit/_channel_ready_future_test.py | 6 ++- src/python/grpcio_tests/tests/unit/_thread_pool.py | 48 ++++++++++++++++++++++ 3 files changed, 59 insertions(+), 5 deletions(-) create mode 100644 src/python/grpcio_tests/tests/unit/_thread_pool.py (limited to 'src') diff --git a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py index 3c00f686ce..9cae96a00d 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_connectivity_test.py @@ -32,12 +32,12 @@ import threading import time import unittest -from concurrent import futures import grpc from grpc import _channel from grpc import _server from tests.unit.framework.common import test_constants +from tests.unit import _thread_pool def _ready_in_connectivities(connectivities): @@ -104,7 +104,8 @@ class ChannelConnectivityTest(unittest.TestCase): grpc.ChannelConnectivity.READY, fifth_connectivities) def test_immediately_connectable_channel_connectivity(self): - server = _server.Server(futures.ThreadPoolExecutor(max_workers=0), ()) + thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) + server = _server.Server(thread_pool, ()) port = server.add_insecure_port('[::]:0') server.start() first_callback = _Callback() @@ -141,9 +142,11 @@ class ChannelConnectivityTest(unittest.TestCase): fourth_connectivities) self.assertNotIn( grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities) + self.assertFalse(thread_pool.was_used()) def test_reachable_then_unreachable_channel_connectivity(self): - server = _server.Server(futures.ThreadPoolExecutor(max_workers=0), ()) + thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) + server = _server.Server(thread_pool, ()) port = server.add_insecure_port('[::]:0') server.start() callback = _Callback() @@ -155,6 +158,7 @@ class ChannelConnectivityTest(unittest.TestCase): server.stop(None) callback.block_until_connectivities_satisfy(_last_connectivity_is_not_ready) channel.unsubscribe(callback.update) + self.assertFalse(thread_pool.was_used()) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py index e8982ed2de..24f5b45b18 100644 --- a/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py +++ b/src/python/grpcio_tests/tests/unit/_channel_ready_future_test.py @@ -31,12 +31,12 @@ import threading import unittest -from concurrent import futures import grpc from grpc import _channel from grpc import _server from tests.unit.framework.common import test_constants +from tests.unit import _thread_pool class _Callback(object): @@ -78,7 +78,8 @@ class ChannelReadyFutureTest(unittest.TestCase): self.assertFalse(ready_future.running()) def test_immediately_connectable_channel_connectivity(self): - server = _server.Server(futures.ThreadPoolExecutor(max_workers=0), ()) + thread_pool = _thread_pool.RecordingThreadPool(max_workers=None) + server = _server.Server(thread_pool, ()) port = server.add_insecure_port('[::]:0') server.start() channel = grpc.insecure_channel('localhost:{}'.format(port)) @@ -97,6 +98,7 @@ class ChannelReadyFutureTest(unittest.TestCase): self.assertFalse(ready_future.cancelled()) self.assertTrue(ready_future.done()) self.assertFalse(ready_future.running()) + self.assertFalse(thread_pool.was_used()) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/_thread_pool.py b/src/python/grpcio_tests/tests/unit/_thread_pool.py new file mode 100644 index 0000000000..f13cc2f86f --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_thread_pool.py @@ -0,0 +1,48 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import threading +from concurrent import futures + + +class RecordingThreadPool(futures.Executor): + """A thread pool that records if used.""" + def __init__(self, max_workers): + self._tp_executor = futures.ThreadPoolExecutor(max_workers=max_workers) + self._lock = threading.Lock() + self._was_used = False + + def submit(self, fn, *args, **kwargs): + with self._lock: + self._was_used = True + self._tp_executor.submit(fn, *args, **kwargs) + + def was_used(self): + with self._lock: + return self._was_used -- cgit v1.2.3 From 2d33d78ac5d5f5eca862bdbb0dd6a2e986b6ddbc Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Tue, 9 Aug 2016 00:50:59 -0500 Subject: Proxy for @ghemawat: Removed an unnecessary error allocation from chttp2 code. --- .../ext/transport/chttp2/transport/chttp2_transport.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 6bd65cea02..751bc8d72e 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -94,7 +94,8 @@ static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error); + grpc_chttp2_transport *t, grpc_error *error, + const char *reason); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -876,7 +877,7 @@ static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "start_writing:nothing_to_write"); } - end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write")); + end_waiting_for_write(exec_ctx, t, GRPC_ERROR_NONE, "Nothing to write"); if (t->ep && !t->endpoint_reading) { destroy_endpoint(exec_ctx, t); } @@ -925,11 +926,18 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } +/* error may be GRPC_ERROR_NONE if there is no error allocated yet. + In that case, use "reason" as the text for a new error. */ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, grpc_error *error) { + grpc_chttp2_transport *t, grpc_error *error, + const char *reason) { grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { + if (error == GRPC_ERROR_NONE && reason != NULL) { + /* create error object. */ + error = GRPC_ERROR_CREATE(reason); + } fail_pending_writes(exec_ctx, &t->global, stream_global, GRPC_ERROR_REF(error)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); @@ -951,7 +959,7 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); - end_waiting_for_write(exec_ctx, t, error); + end_waiting_for_write(exec_ctx, t, error, NULL); switch (t->executor.write_state) { case GRPC_CHTTP2_WRITING_INACTIVE: -- cgit v1.2.3 From 40160d17a8afa4f8b394a61c78313684e9231c9c Mon Sep 17 00:00:00 2001 From: Vijay Pai Date: Mon, 8 Aug 2016 22:55:16 -0700 Subject: clang-format --- src/core/ext/transport/chttp2/transport/chttp2_transport.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 751bc8d72e..28768f57f0 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -95,7 +95,7 @@ static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error, - const char *reason); + const char *reason); /** Set a transport level setting, and push it to our peer */ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -930,7 +930,7 @@ static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, In that case, use "reason" as the text for a new error. */ static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error, - const char *reason) { + const char *reason) { grpc_chttp2_stream_global *stream_global; while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, &stream_global)) { -- cgit v1.2.3