From f0397933b007e2614ba38fc98f0ee6391a2eea9d Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Thu, 31 May 2018 19:39:52 -0700 Subject: Fathom TCP level changes. TracedBuffer for keeping track of all buffers to be traced. Adding tests for Fathom and TracedBuffer. A lot more. Please read PR description. --- src/core/lib/iomgr/port.h | 1 + 1 file changed, 1 insertion(+) (limited to 'src/core/lib/iomgr/port.h') diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 80d8e63cdd..a06e9f827b 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -60,6 +60,7 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_LINUX_ERRQUEUE 1 #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 #define GRPC_POSIX_FORK 1 #define GRPC_POSIX_HOST_NAME_MAX 1 -- cgit v1.2.3 From ac50c81a08025e36dd5a6e9127c98e08d3195c9c Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 13 Aug 2018 14:28:36 -0700 Subject: Try --- src/core/lib/iomgr/internal_errqueue.h | 30 ------------------------------ src/core/lib/iomgr/port.h | 4 ++++ 2 files changed, 4 insertions(+), 30 deletions(-) (limited to 'src/core/lib/iomgr/port.h') diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h index bbe3377b43..50037bf0e9 100644 --- a/src/core/lib/iomgr/internal_errqueue.h +++ b/src/core/lib/iomgr/internal_errqueue.h @@ -41,36 +41,6 @@ #endif /* GRPC_LINUX_ERRQUEUE */ namespace grpc_core { -/* Redefining scm_timestamping in the same way that defines - * it, so that code compiles on systems that don't have it. */ -struct scm_timestamping { - struct timespec ts[3]; -}; - -/* Also redefine timestamp types */ -/* The timestamp type for when the driver passed skb to NIC, or HW. */ -constexpr int SCM_TSTAMP_SND = 0; -/* The timestamp type for when data entered the packet scheduler. */ -constexpr int SCM_TSTAMP_SCHED = 1; -/* The timestamp type for when data acknowledged by peer. */ -constexpr int SCM_TSTAMP_ACK = 2; - -/* Redefine required constants from */ -constexpr uint32_t SOF_TIMESTAMPING_TX_SOFTWARE = 1u << 1; -constexpr uint32_t SOF_TIMESTAMPING_SOFTWARE = 1u << 4; -constexpr uint32_t SOF_TIMESTAMPING_OPT_ID = 1u << 7; -constexpr uint32_t SOF_TIMESTAMPING_TX_SCHED = 1u << 8; -constexpr uint32_t SOF_TIMESTAMPING_TX_ACK = 1u << 9; -constexpr uint32_t SOF_TIMESTAMPING_OPT_TSONLY = 1u << 11; - -constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE | - SOF_TIMESTAMPING_OPT_ID | - SOF_TIMESTAMPING_OPT_TSONLY; - -constexpr uint32_t kTimestampingRecordingOptions = - SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE | - SOF_TIMESTAMPING_TX_ACK; - /* Returns true if kernel is capable of supporting errqueue and timestamping. * Currently allowing only linux kernels above 4.0.0 */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index b68305ce0e..375f1844ac 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -60,7 +60,11 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#ifdef LINUX_VERSION_CODE +#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) #define GRPC_LINUX_ERRQUEUE 1 +#endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) */ +#endif /* LINUX_VERSION_CODE */ #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 #define GRPC_POSIX_FORK 1 #define GRPC_POSIX_HOST_NAME_MAX 1 -- cgit v1.2.3 From b49f47d4f26b48392950029762cd33e6365657c6 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 13 Aug 2018 16:28:53 -0700 Subject: linux version needs to be defined --- src/core/lib/iomgr/internal_errqueue.h | 10 ++++++++++ src/core/lib/iomgr/port.h | 1 + src/core/lib/iomgr/tcp_posix.cc | 3 +-- test/core/iomgr/buffer_list_test.cc | 6 +++--- 4 files changed, 15 insertions(+), 5 deletions(-) (limited to 'src/core/lib/iomgr/port.h') diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h index 50037bf0e9..fc11be9a6d 100644 --- a/src/core/lib/iomgr/internal_errqueue.h +++ b/src/core/lib/iomgr/internal_errqueue.h @@ -41,6 +41,16 @@ #endif /* GRPC_LINUX_ERRQUEUE */ namespace grpc_core { + +#ifdef GRPC_LINUX_ERRQUEUE +constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE | + SOF_TIMESTAMPING_OPT_ID | + SOF_TIMESTAMPING_OPT_TSONLY; +constexpr uint32_t kTimestampingRecordingOptions = + SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE | + SOF_TIMESTAMPING_TX_ACK; +#endif /* GRPC_LINUX_ERRQUEUE */ + /* Returns true if kernel is capable of supporting errqueue and timestamping. * Currently allowing only linux kernels above 4.0.0 */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 375f1844ac..4d728a75fb 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -60,6 +60,7 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#include #ifdef LINUX_VERSION_CODE #if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) #define GRPC_LINUX_ERRQUEUE 1 diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index d8f58408c7..6d4c096217 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -633,8 +633,7 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, return cmsg; } - auto tss = - reinterpret_cast(CMSG_DATA(cmsg)); + auto tss = reinterpret_cast(CMSG_DATA(cmsg)); auto serr = reinterpret_cast(CMSG_DATA(next_cmsg)); if (serr->ee_errno != ENOMSG || serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) { diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc index 9ffb71c85f..f1773580bd 100644 --- a/test/core/iomgr/buffer_list_test.cc +++ b/test/core/iomgr/buffer_list_test.cc @@ -75,8 +75,8 @@ static void TestVerifierCalledOnAckVerifier(void* arg, static void TestVerifierCalledOnAck() { struct sock_extended_err serr; serr.ee_data = 213; - serr.ee_info = grpc_core::SCM_TSTAMP_ACK; - struct grpc_core::scm_timestamping tss; + serr.ee_info = SCM_TSTAMP_ACK; + struct scm_timestamping tss; tss.ts[0].tv_sec = 123; tss.ts[0].tv_nsec = 456; grpc_core::grpc_tcp_set_write_timestamps_callback( @@ -106,6 +106,6 @@ int main(int argc, char** argv) { #else /* GRPC_LINUX_ERRQUEUE */ -int main(int argc, char** argv) { return 1; } +int main(int argc, char** argv) { return 0; } #endif /* GRPC_LINUX_ERRQUEUE */ -- cgit v1.2.3 From 97ba943f179cf6eda6f1a85abee01d459f507673 Mon Sep 17 00:00:00 2001 From: Jan Tattermusch Date: Mon, 27 Aug 2018 12:43:20 +0200 Subject: Revert "Fathom tcp changes" --- BUILD | 4 - CMakeLists.txt | 46 --- Makefile | 48 ---- build.yaml | 18 -- config.m4 | 2 - config.w32 | 2 - gRPC-C++.podspec | 4 - gRPC-Core.podspec | 6 - grpc.gemspec | 4 - grpc.gyp | 8 - package.xml | 4 - .../client_channel/http_connect_handshaker.cc | 2 +- .../chttp2/client/insecure/channel_create_posix.cc | 2 +- .../chttp2/server/insecure/server_chttp2_posix.cc | 2 +- .../transport/chttp2/transport/chttp2_transport.cc | 3 +- src/core/lib/http/httpcli.cc | 2 +- src/core/lib/iomgr/buffer_list.cc | 134 --------- src/core/lib/iomgr/buffer_list.h | 96 ------- src/core/lib/iomgr/endpoint.cc | 4 +- src/core/lib/iomgr/endpoint.h | 8 +- src/core/lib/iomgr/endpoint_cfstream.cc | 2 +- src/core/lib/iomgr/endpoint_pair_posix.cc | 4 +- src/core/lib/iomgr/ev_posix.cc | 9 +- src/core/lib/iomgr/internal_errqueue.cc | 40 --- src/core/lib/iomgr/internal_errqueue.h | 62 ---- src/core/lib/iomgr/port.h | 6 - src/core/lib/iomgr/tcp_client_posix.cc | 2 +- src/core/lib/iomgr/tcp_custom.cc | 2 +- src/core/lib/iomgr/tcp_posix.cc | 311 +-------------------- src/core/lib/iomgr/tcp_posix.h | 3 - src/core/lib/iomgr/tcp_server_posix.cc | 4 +- .../lib/iomgr/tcp_server_utils_posix_common.cc | 2 +- src/core/lib/iomgr/tcp_windows.cc | 2 +- src/core/lib/iomgr/udp_server.cc | 2 +- src/core/lib/security/transport/secure_endpoint.cc | 4 +- .../lib/security/transport/security_handshaker.cc | 2 +- src/python/grpcio/grpc_core_dependencies.py | 2 - test/core/bad_client/bad_client.cc | 2 +- test/core/end2end/bad_server_response_test.cc | 2 +- test/core/end2end/fixtures/http_proxy_fixture.cc | 10 +- test/core/iomgr/BUILD | 13 - test/core/iomgr/buffer_list_test.cc | 111 -------- test/core/iomgr/endpoint_tests.cc | 7 +- test/core/iomgr/tcp_posix_test.cc | 109 ++------ test/core/util/mock_endpoint.cc | 2 +- test/core/util/passthru_endpoint.cc | 2 +- test/core/util/trickle_endpoint.cc | 5 +- test/cpp/microbenchmarks/bm_chttp2_transport.cc | 2 +- tools/doxygen/Doxyfile.c++.internal | 2 - tools/doxygen/Doxyfile.core.internal | 4 - tools/run_tests/generated/sources_and_headers.json | 23 -- tools/run_tests/generated/tests.json | 20 -- 52 files changed, 68 insertions(+), 1104 deletions(-) delete mode 100644 src/core/lib/iomgr/buffer_list.cc delete mode 100644 src/core/lib/iomgr/buffer_list.h delete mode 100644 src/core/lib/iomgr/internal_errqueue.cc delete mode 100644 src/core/lib/iomgr/internal_errqueue.h delete mode 100644 test/core/iomgr/buffer_list_test.cc (limited to 'src/core/lib/iomgr/port.h') diff --git a/BUILD b/BUILD index c01b971281..e1b00a5cf0 100644 --- a/BUILD +++ b/BUILD @@ -696,7 +696,6 @@ grpc_cc_library( "src/core/lib/http/format_request.cc", "src/core/lib/http/httpcli.cc", "src/core/lib/http/parser.cc", - "src/core/lib/iomgr/buffer_list.cc", "src/core/lib/iomgr/call_combiner.cc", "src/core/lib/iomgr/combiner.cc", "src/core/lib/iomgr/endpoint.cc", @@ -717,7 +716,6 @@ grpc_cc_library( "src/core/lib/iomgr/gethostname_fallback.cc", "src/core/lib/iomgr/gethostname_host_name_max.cc", "src/core/lib/iomgr/gethostname_sysconf.cc", - "src/core/lib/iomgr/internal_errqueue.cc", "src/core/lib/iomgr/iocp_windows.cc", "src/core/lib/iomgr/iomgr.cc", "src/core/lib/iomgr/iomgr_custom.cc", @@ -847,7 +845,6 @@ grpc_cc_library( "src/core/lib/http/format_request.h", "src/core/lib/http/httpcli.h", "src/core/lib/http/parser.h", - "src/core/lib/iomgr/buffer_list.h", "src/core/lib/iomgr/block_annotate.h", "src/core/lib/iomgr/call_combiner.h", "src/core/lib/iomgr/closure.h", @@ -865,7 +862,6 @@ grpc_cc_library( "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/gethostname.h", "src/core/lib/iomgr/gevent_util.h", - "src/core/lib/iomgr/internal_errqueue.h", "src/core/lib/iomgr/iocp_windows.h", "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_custom.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 3895d4c0f1..a330fefc27 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -230,9 +230,6 @@ add_dependencies(buildtests_c avl_test) add_dependencies(buildtests_c bad_server_response_test) add_dependencies(buildtests_c bin_decoder_test) add_dependencies(buildtests_c bin_encoder_test) -if(_gRPC_PLATFORM_LINUX) -add_dependencies(buildtests_c buffer_list_test) -endif() add_dependencies(buildtests_c channel_create_test) add_dependencies(buildtests_c chttp2_hpack_encoder_test) add_dependencies(buildtests_c chttp2_stream_map_test) @@ -962,7 +959,6 @@ add_library(grpc src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc - src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -983,7 +979,6 @@ add_library(grpc src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc - src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -1370,7 +1365,6 @@ add_library(grpc_cronet src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc - src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -1391,7 +1385,6 @@ add_library(grpc_cronet src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc - src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -1764,7 +1757,6 @@ add_library(grpc_test_util src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc - src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -1785,7 +1777,6 @@ add_library(grpc_test_util src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc - src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -2074,7 +2065,6 @@ add_library(grpc_test_util_unsecure src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc - src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -2095,7 +2085,6 @@ add_library(grpc_test_util_unsecure src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc - src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -2363,7 +2352,6 @@ add_library(grpc_unsecure src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc - src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -2384,7 +2372,6 @@ add_library(grpc_unsecure src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc - src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -3205,7 +3192,6 @@ add_library(grpc++_cronet src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc - src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -3226,7 +3212,6 @@ add_library(grpc++_cronet src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc - src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -5850,37 +5835,6 @@ target_link_libraries(bin_encoder_test grpc ) -endif (gRPC_BUILD_TESTS) -if (gRPC_BUILD_TESTS) -if(_gRPC_PLATFORM_LINUX) - -add_executable(buffer_list_test - test/core/iomgr/buffer_list_test.cc -) - - -target_include_directories(buffer_list_test - PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} - PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include - PRIVATE ${_gRPC_SSL_INCLUDE_DIR} - PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} - PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} - PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} - PRIVATE ${_gRPC_CARES_INCLUDE_DIR} - PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} - PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR} -) - -target_link_libraries(buffer_list_test - ${_gRPC_ALLTARGETS_LIBRARIES} - grpc_test_util - grpc - gpr_test_util - gpr -) - -endif() endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index 7caf585ee9..e9a9486a3c 100644 --- a/Makefile +++ b/Makefile @@ -978,7 +978,6 @@ avl_test: $(BINDIR)/$(CONFIG)/avl_test bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test -buffer_list_test: $(BINDIR)/$(CONFIG)/buffer_list_test channel_create_test: $(BINDIR)/$(CONFIG)/channel_create_test check_epollexclusive: $(BINDIR)/$(CONFIG)/check_epollexclusive chttp2_hpack_encoder_test: $(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test @@ -1435,7 +1434,6 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/bad_server_response_test \ $(BINDIR)/$(CONFIG)/bin_decoder_test \ $(BINDIR)/$(CONFIG)/bin_encoder_test \ - $(BINDIR)/$(CONFIG)/buffer_list_test \ $(BINDIR)/$(CONFIG)/channel_create_test \ $(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test \ $(BINDIR)/$(CONFIG)/chttp2_stream_map_test \ @@ -1952,8 +1950,6 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 ) $(E) "[RUN] Testing bin_encoder_test" $(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 ) - $(E) "[RUN] Testing buffer_list_test" - $(Q) $(BINDIR)/$(CONFIG)/buffer_list_test || ( echo test buffer_list_test failed ; exit 1 ) $(E) "[RUN] Testing channel_create_test" $(Q) $(BINDIR)/$(CONFIG)/channel_create_test || ( echo test channel_create_test failed ; exit 1 ) $(E) "[RUN] Testing chttp2_hpack_encoder_test" @@ -3464,7 +3460,6 @@ LIBGRPC_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ - src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -3485,7 +3480,6 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ - src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -3871,7 +3865,6 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ - src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -3892,7 +3885,6 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ - src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -4263,7 +4255,6 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ - src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -4284,7 +4275,6 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ - src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -4564,7 +4554,6 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ - src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -4585,7 +4574,6 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ - src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -4831,7 +4819,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ - src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -4852,7 +4839,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ - src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -5661,7 +5647,6 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ - src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -5682,7 +5667,6 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ - src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -10717,38 +10701,6 @@ endif endif -BUFFER_LIST_TEST_SRC = \ - test/core/iomgr/buffer_list_test.cc \ - -BUFFER_LIST_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BUFFER_LIST_TEST_SRC)))) -ifeq ($(NO_SECURE),true) - -# You can't build secure targets if you don't have OpenSSL. - -$(BINDIR)/$(CONFIG)/buffer_list_test: openssl_dep_error - -else - - - -$(BINDIR)/$(CONFIG)/buffer_list_test: $(BUFFER_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - $(E) "[LD] Linking $@" - $(Q) mkdir -p `dirname $@` - $(Q) $(LD) $(LDFLAGS) $(BUFFER_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/buffer_list_test - -endif - -$(OBJDIR)/$(CONFIG)/test/core/iomgr/buffer_list_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a - -deps_buffer_list_test: $(BUFFER_LIST_TEST_OBJS:.o=.dep) - -ifneq ($(NO_SECURE),true) -ifneq ($(NO_DEPS),true) --include $(BUFFER_LIST_TEST_OBJS:.o=.dep) -endif -endif - - CHANNEL_CREATE_TEST_SRC = \ test/core/surface/channel_create_test.cc \ diff --git a/build.yaml b/build.yaml index a7ebee7572..ce21068c31 100644 --- a/build.yaml +++ b/build.yaml @@ -256,7 +256,6 @@ filegroups: - src/core/lib/http/format_request.cc - src/core/lib/http/httpcli.cc - src/core/lib/http/parser.cc - - src/core/lib/iomgr/buffer_list.cc - src/core/lib/iomgr/call_combiner.cc - src/core/lib/iomgr/combiner.cc - src/core/lib/iomgr/endpoint.cc @@ -277,7 +276,6 @@ filegroups: - src/core/lib/iomgr/gethostname_fallback.cc - src/core/lib/iomgr/gethostname_host_name_max.cc - src/core/lib/iomgr/gethostname_sysconf.cc - - src/core/lib/iomgr/internal_errqueue.cc - src/core/lib/iomgr/iocp_windows.cc - src/core/lib/iomgr/iomgr.cc - src/core/lib/iomgr/iomgr_custom.cc @@ -436,7 +434,6 @@ filegroups: - src/core/lib/http/httpcli.h - src/core/lib/http/parser.h - src/core/lib/iomgr/block_annotate.h - - src/core/lib/iomgr/buffer_list.h - src/core/lib/iomgr/call_combiner.h - src/core/lib/iomgr/closure.h - src/core/lib/iomgr/combiner.h @@ -452,7 +449,6 @@ filegroups: - src/core/lib/iomgr/exec_ctx.h - src/core/lib/iomgr/executor.h - src/core/lib/iomgr/gethostname.h - - src/core/lib/iomgr/internal_errqueue.h - src/core/lib/iomgr/iocp_windows.h - src/core/lib/iomgr/iomgr.h - src/core/lib/iomgr/iomgr_custom.h @@ -2143,20 +2139,6 @@ targets: - grpc_test_util - grpc uses_polling: false -- name: buffer_list_test - build: test - language: c - src: - - test/core/iomgr/buffer_list_test.cc - deps: - - grpc_test_util - - grpc - - gpr_test_util - - gpr - exclude_iomgrs: - - uv - platforms: - - linux - name: channel_create_test build: test language: c diff --git a/config.m4 b/config.m4 index af3624cdd1..7825274eea 100644 --- a/config.m4 +++ b/config.m4 @@ -108,7 +108,6 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ - src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -129,7 +128,6 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ - src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ diff --git a/config.w32 b/config.w32 index ad91ee40bd..a9d1e6c9d0 100644 --- a/config.w32 +++ b/config.w32 @@ -83,7 +83,6 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\http\\format_request.cc " + "src\\core\\lib\\http\\httpcli.cc " + "src\\core\\lib\\http\\parser.cc " + - "src\\core\\lib\\iomgr\\buffer_list.cc " + "src\\core\\lib\\iomgr\\call_combiner.cc " + "src\\core\\lib\\iomgr\\combiner.cc " + "src\\core\\lib\\iomgr\\endpoint.cc " + @@ -104,7 +103,6 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\iomgr\\gethostname_fallback.cc " + "src\\core\\lib\\iomgr\\gethostname_host_name_max.cc " + "src\\core\\lib\\iomgr\\gethostname_sysconf.cc " + - "src\\core\\lib\\iomgr\\internal_errqueue.cc " + "src\\core\\lib\\iomgr\\iocp_windows.cc " + "src\\core\\lib\\iomgr\\iomgr.cc " + "src\\core\\lib\\iomgr\\iomgr_custom.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index a387794f57..207d7baa8f 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -382,7 +382,6 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.h', 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', - 'src/core/lib/iomgr/buffer_list.h', 'src/core/lib/iomgr/call_combiner.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', @@ -398,7 +397,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', - 'src/core/lib/iomgr/internal_errqueue.h', 'src/core/lib/iomgr/iocp_windows.h', 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_custom.h', @@ -572,7 +570,6 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.h', 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', - 'src/core/lib/iomgr/buffer_list.h', 'src/core/lib/iomgr/call_combiner.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', @@ -588,7 +585,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', - 'src/core/lib/iomgr/internal_errqueue.h', 'src/core/lib/iomgr/iocp_windows.h', 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_custom.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 9832283e5c..f1a9bce7f2 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -394,7 +394,6 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.h', 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', - 'src/core/lib/iomgr/buffer_list.h', 'src/core/lib/iomgr/call_combiner.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', @@ -410,7 +409,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', - 'src/core/lib/iomgr/internal_errqueue.h', 'src/core/lib/iomgr/iocp_windows.h', 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_custom.h', @@ -540,7 +538,6 @@ Pod::Spec.new do |s| 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', - 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -561,7 +558,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', - 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', @@ -997,7 +993,6 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.h', 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', - 'src/core/lib/iomgr/buffer_list.h', 'src/core/lib/iomgr/call_combiner.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', @@ -1013,7 +1008,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', - 'src/core/lib/iomgr/internal_errqueue.h', 'src/core/lib/iomgr/iocp_windows.h', 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_custom.h', diff --git a/grpc.gemspec b/grpc.gemspec index c8e58faec9..5c38071d15 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -330,7 +330,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/http/httpcli.h ) s.files += %w( src/core/lib/http/parser.h ) s.files += %w( src/core/lib/iomgr/block_annotate.h ) - s.files += %w( src/core/lib/iomgr/buffer_list.h ) s.files += %w( src/core/lib/iomgr/call_combiner.h ) s.files += %w( src/core/lib/iomgr/closure.h ) s.files += %w( src/core/lib/iomgr/combiner.h ) @@ -346,7 +345,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/exec_ctx.h ) s.files += %w( src/core/lib/iomgr/executor.h ) s.files += %w( src/core/lib/iomgr/gethostname.h ) - s.files += %w( src/core/lib/iomgr/internal_errqueue.h ) s.files += %w( src/core/lib/iomgr/iocp_windows.h ) s.files += %w( src/core/lib/iomgr/iomgr.h ) s.files += %w( src/core/lib/iomgr/iomgr_custom.h ) @@ -476,7 +474,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/http/format_request.cc ) s.files += %w( src/core/lib/http/httpcli.cc ) s.files += %w( src/core/lib/http/parser.cc ) - s.files += %w( src/core/lib/iomgr/buffer_list.cc ) s.files += %w( src/core/lib/iomgr/call_combiner.cc ) s.files += %w( src/core/lib/iomgr/combiner.cc ) s.files += %w( src/core/lib/iomgr/endpoint.cc ) @@ -497,7 +494,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/gethostname_fallback.cc ) s.files += %w( src/core/lib/iomgr/gethostname_host_name_max.cc ) s.files += %w( src/core/lib/iomgr/gethostname_sysconf.cc ) - s.files += %w( src/core/lib/iomgr/internal_errqueue.cc ) s.files += %w( src/core/lib/iomgr/iocp_windows.cc ) s.files += %w( src/core/lib/iomgr/iomgr.cc ) s.files += %w( src/core/lib/iomgr/iomgr_custom.cc ) diff --git a/grpc.gyp b/grpc.gyp index 654a531092..a36998bcb3 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -300,7 +300,6 @@ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', - 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -321,7 +320,6 @@ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', - 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', @@ -662,7 +660,6 @@ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', - 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -683,7 +680,6 @@ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', - 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', @@ -897,7 +893,6 @@ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', - 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -918,7 +913,6 @@ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', - 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', @@ -1110,7 +1104,6 @@ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', - 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -1131,7 +1124,6 @@ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', - 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', diff --git a/package.xml b/package.xml index 537c5404e7..1bdf127c36 100644 --- a/package.xml +++ b/package.xml @@ -335,7 +335,6 @@ - @@ -351,7 +350,6 @@ - @@ -481,7 +479,6 @@ - @@ -502,7 +499,6 @@ - diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 7ce8da8c00..4e8b8b71db 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -320,7 +320,7 @@ static void http_connect_handshaker_do_handshake( // Take a new ref to be held by the write callback. gpr_ref(&handshaker->refcount); grpc_endpoint_write(args->endpoint, &handshaker->write_buffer, - &handshaker->request_done_closure, nullptr); + &handshaker->request_done_closure); gpr_mu_unlock(&handshaker->mu); } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index 5bdcb387c9..dfed824cd5 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); grpc_endpoint* client = grpc_tcp_client_create_from_fd( - grpc_fd_create(fd, "client", true), args, "fd-client"); + grpc_fd_create(fd, "client", false), args, "fd-client"); grpc_transport* transport = grpc_create_chttp2_transport(final_args, client, true); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index e4bd91d07b..a0228785ee 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -44,7 +44,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, gpr_asprintf(&name, "fd:%d", fd); grpc_endpoint* server_endpoint = - grpc_tcp_create(grpc_fd_create(fd, name, true), + grpc_tcp_create(grpc_fd_create(fd, name, false), grpc_server_get_channel_args(server), name); gpr_free(name); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 027a57d606..36511fa608 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1029,8 +1029,7 @@ static void write_action(void* gt, grpc_error* error) { grpc_endpoint_write( t->ep, &t->outbuf, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner)), - nullptr); + grpc_combiner_scheduler(t->combiner))); } /* Callback from the grpc_endpoint after bytes have been written by calling diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc index 3bd7a2ce59..12060074c5 100644 --- a/src/core/lib/http/httpcli.cc +++ b/src/core/lib/http/httpcli.cc @@ -163,7 +163,7 @@ static void done_write(void* arg, grpc_error* error) { static void start_write(internal_request* req) { grpc_slice_ref_internal(req->request_text); grpc_slice_buffer_add(&req->outgoing, req->request_text); - grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write, nullptr); + grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write); } static void on_handshake_done(void* arg, grpc_endpoint* ep) { diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc deleted file mode 100644 index 6ada23db1c..0000000000 --- a/src/core/lib/iomgr/buffer_list.cc +++ /dev/null @@ -1,134 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include "src/core/lib/iomgr/buffer_list.h" -#include "src/core/lib/iomgr/port.h" - -#include - -#ifdef GRPC_LINUX_ERRQUEUE -#include - -#include "src/core/lib/gprpp/memory.h" - -namespace grpc_core { -void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, - void* arg) { - GPR_DEBUG_ASSERT(head != nullptr); - TracedBuffer* new_elem = New(seq_no, arg); - /* Store the current time as the sendmsg time. */ - new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME); - if (*head == nullptr) { - *head = new_elem; - return; - } - /* Append at the end. */ - TracedBuffer* ptr = *head; - while (ptr->next_ != nullptr) { - ptr = ptr->next_; - } - ptr->next_ = new_elem; -} - -namespace { -/** Fills gpr_timespec gts based on values from timespec ts */ -void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) { - gts->tv_sec = ts->tv_sec; - gts->tv_nsec = static_cast(ts->tv_nsec); - gts->clock_type = GPR_CLOCK_REALTIME; -} - -/** The saved callback function that will be invoked when we get all the - * timestamps that we are going to get for a TracedBuffer. */ -void (*timestamps_callback)(void*, grpc_core::Timestamps*, - grpc_error* shutdown_err); -} /* namespace */ - -void TracedBuffer::ProcessTimestamp(TracedBuffer** head, - struct sock_extended_err* serr, - struct scm_timestamping* tss) { - GPR_DEBUG_ASSERT(head != nullptr); - TracedBuffer* elem = *head; - TracedBuffer* next = nullptr; - while (elem != nullptr) { - /* The byte number refers to the sequence number of the last byte which this - * timestamp relates to. */ - if (serr->ee_data >= elem->seq_no_) { - switch (serr->ee_info) { - case SCM_TSTAMP_SCHED: - fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0])); - elem = elem->next_; - break; - case SCM_TSTAMP_SND: - fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0])); - elem = elem->next_; - break; - case SCM_TSTAMP_ACK: - fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0])); - /* Got all timestamps. Do the callback and free this TracedBuffer. - * The thing below can be passed by value if we don't want the - * restriction on the lifetime. */ - timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE); - next = elem->next_; - Delete(elem); - *head = elem = next; - break; - default: - abort(); - } - } else { - break; - } - } -} - -void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) { - GPR_DEBUG_ASSERT(head != nullptr); - TracedBuffer* elem = *head; - while (elem != nullptr) { - if (timestamps_callback) { - timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err); - } - auto* next = elem->next_; - Delete(elem); - elem = next; - } - *head = nullptr; - GRPC_ERROR_UNREF(shutdown_err); -} - -void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*, - grpc_core::Timestamps*, - grpc_error* error)) { - timestamps_callback = fn; -} -} /* namespace grpc_core */ - -#else /* GRPC_LINUX_ERRQUEUE */ - -namespace grpc_core { -void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*, - grpc_core::Timestamps*, - grpc_error* error)) { - gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform"); -} -} /* namespace grpc_core */ - -#endif /* GRPC_LINUX_ERRQUEUE */ diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h deleted file mode 100644 index cbbf50a657..0000000000 --- a/src/core/lib/iomgr/buffer_list.h +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H -#define GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H - -#include - -#include "src/core/lib/iomgr/port.h" - -#include - -#include "src/core/lib/gprpp/memory.h" -#include "src/core/lib/iomgr/error.h" -#include "src/core/lib/iomgr/internal_errqueue.h" - -namespace grpc_core { -struct Timestamps { - /* TODO(yashykt): This would also need to store OPTSTAT once support is added - */ - gpr_timespec sendmsg_time; - gpr_timespec scheduled_time; - gpr_timespec sent_time; - gpr_timespec acked_time; -}; - -/** TracedBuffer is a class to keep track of timestamps for a specific buffer in - * the TCP layer. We are only tracking timestamps for Linux kernels and hence - * this class would only be used by Linux platforms. For all other platforms, - * TracedBuffer would be an empty class. - * - * The timestamps collected are according to grpc_core::Timestamps declared - * above. - * - * A TracedBuffer list is kept track of using the head element of the list. If - * the head element of the list is nullptr, then the list is empty. - */ -#ifdef GRPC_LINUX_ERRQUEUE -class TracedBuffer { - public: - /** Add a new entry in the TracedBuffer list pointed to by head. Also saves - * sendmsg_time with the current timestamp. */ - static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no, - void* arg); - - /** Processes a received timestamp based on sock_extended_err and - * scm_timestamping structures. It will invoke the timestamps callback if the - * timestamp type is SCM_TSTAMP_ACK. */ - static void ProcessTimestamp(grpc_core::TracedBuffer** head, - struct sock_extended_err* serr, - struct scm_timestamping* tss); - - /** Cleans the list by calling the callback for each traced buffer in the list - * with timestamps that it has. */ - static void Shutdown(grpc_core::TracedBuffer** head, - grpc_error* shutdown_err); - - private: - GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW - - TracedBuffer(int seq_no, void* arg) - : seq_no_(seq_no), arg_(arg), next_(nullptr) {} - - uint32_t seq_no_; /* The sequence number for the last byte in the buffer */ - void* arg_; /* The arg to pass to timestamps_callback */ - grpc_core::Timestamps ts_; /* The timestamps corresponding to this buffer */ - grpc_core::TracedBuffer* next_; /* The next TracedBuffer in the list */ -}; -#else /* GRPC_LINUX_ERRQUEUE */ -class TracedBuffer {}; -#endif /* GRPC_LINUX_ERRQUEUE */ - -/** Sets the callback function to call when timestamps for a write are - * collected. The callback does not own a reference to error. */ -void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*, - grpc_core::Timestamps*, - grpc_error* error)); - -}; /* namespace grpc_core */ - -#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H */ diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 44fb47e19d..92e7930111 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -28,8 +28,8 @@ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices, } void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { - ep->vtable->write(ep, slices, cb, arg); + grpc_closure* cb) { + ep->vtable->write(ep, slices, cb); } void grpc_endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) { diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 1f590a80ca..15db1649fa 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -33,12 +33,10 @@ typedef struct grpc_endpoint grpc_endpoint; typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; -class Timestamps; struct grpc_endpoint_vtable { void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb); - void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, - void* arg); + void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb); void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset); void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset); void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset); @@ -72,11 +70,9 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep); \a slices may be mutated at will by the endpoint until cb is called. No guarantee is made to the content of slices after a write EXCEPT that it is a valid slice buffer. - \a arg is platform specific. It is currently only used by TCP on linux - platforms as an argument that would be forwarded to the timestamps callback. */ void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg); + grpc_closure* cb); /* Causes any pending and future read/write callbacks to run immediately with success==0 */ diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index df2cf508c8..c3bc0cc8fd 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -268,7 +268,7 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, } static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu", diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 3afbfd7254..5c5c246f99 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, grpc_core::ExecCtx exec_ctx; gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args, + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args, + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args, "socketpair-client"); gpr_free(final_name); diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index d4377e2d50..0205363d5c 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -237,19 +237,14 @@ void grpc_event_engine_shutdown(void) { } bool grpc_event_engine_can_track_errors(void) { -/* Only track errors if platform supports errqueue. */ -#ifdef GRPC_LINUX_ERRQUEUE return g_event_engine->can_track_err; -#else - return false; -#endif /* GRPC_LINUX_ERRQUEUE */ } grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); - return g_event_engine->fd_create(fd, name, - track_err && g_event_engine->can_track_err); + GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err); + return g_event_engine->fd_create(fd, name, track_err); } int grpc_fd_wrapped_fd(grpc_fd* fd) { diff --git a/src/core/lib/iomgr/internal_errqueue.cc b/src/core/lib/iomgr/internal_errqueue.cc deleted file mode 100644 index 8823737e49..0000000000 --- a/src/core/lib/iomgr/internal_errqueue.cc +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include - -#include "src/core/lib/iomgr/port.h" - -#include "src/core/lib/iomgr/internal_errqueue.h" - -#ifdef GRPC_POSIX_SOCKET_TCP - -#ifdef GPR_LINUX -#include -#endif /* GPR_LINUX */ - -bool kernel_supports_errqueue() { -#ifdef LINUX_VERSION_CODE -#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) - return true; -#endif /* LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0) */ -#endif /* LINUX_VERSION_CODE */ - return false; -} - -#endif /* GRPC_POSIX_SOCKET_TCP */ diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h deleted file mode 100644 index fc11be9a6d..0000000000 --- a/src/core/lib/iomgr/internal_errqueue.h +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/* This file contains constants defined in and - * so as to allow collecting network timestamps in the - * kernel. This file allows tcp_posix.cc to compile on platforms that do not - * have and . - */ - -#ifndef GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H -#define GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H - -#include - -#include "src/core/lib/iomgr/port.h" - -#ifdef GRPC_POSIX_SOCKET_TCP - -#include -#include - -#ifdef GRPC_LINUX_ERRQUEUE -#include -#include -#include -#endif /* GRPC_LINUX_ERRQUEUE */ - -namespace grpc_core { - -#ifdef GRPC_LINUX_ERRQUEUE -constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE | - SOF_TIMESTAMPING_OPT_ID | - SOF_TIMESTAMPING_OPT_TSONLY; -constexpr uint32_t kTimestampingRecordingOptions = - SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE | - SOF_TIMESTAMPING_TX_ACK; -#endif /* GRPC_LINUX_ERRQUEUE */ - -/* Returns true if kernel is capable of supporting errqueue and timestamping. - * Currently allowing only linux kernels above 4.0.0 - */ -bool kernel_supports_errqueue(); -} // namespace grpc_core - -#endif /* GRPC_POSIX_SOCKET_TCP */ - -#endif /* GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index a4688fd0ef..066417b93c 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -60,12 +60,6 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 -#include -#ifdef LINUX_VERSION_CODE -#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) -#define GRPC_LINUX_ERRQUEUE 1 -#endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) */ -#endif /* LINUX_VERSION_CODE */ #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 #define GRPC_POSIX_FORK 1 #define GRPC_POSIX_HOST_NAME_MAX 1 diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 9c989b7dfe..296ee74311 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -279,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args, } addr_str = grpc_sockaddr_to_uri(mapped_addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); - *fdobj = grpc_fd_create(fd, name, true); + *fdobj = grpc_fd_create(fd, name, false); gpr_free(name); gpr_free(addr_str); return GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index e02a1898f2..990e8d632b 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -221,7 +221,7 @@ static void custom_write_callback(grpc_custom_socket* socket, } static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep; GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 1db2790265..b53ffbf01c 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -27,9 +27,7 @@ #include #include -#include #include -#include #include #include #include @@ -48,7 +46,6 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" @@ -100,42 +97,17 @@ struct grpc_tcp { grpc_closure read_done_closure; grpc_closure write_done_closure; - grpc_closure error_closure; char* peer_string; grpc_resource_user* resource_user; grpc_resource_user_slice_allocator slice_allocator; - - grpc_core::TracedBuffer* tb_head; /* List of traced buffers */ - gpr_mu tb_mu; /* Lock for access to list of traced buffers */ - - /* grpc_endpoint_write takes an argument which if non-null means that the - * transport layer wants the TCP layer to collect timestamps for this write. - * This arg is forwarded to the timestamps callback function when the ACK - * timestamp is received from the kernel. This arg is a (void *) which allows - * users of this API to pass in a pointer to any kind of structure. This - * structure could actually be a tag or any book-keeping object that the user - * can use to distinguish between different traced writes. The only - * requirement from the TCP endpoint layer is that this arg should be non-null - * if the user wants timestamps for the write. */ - void* outgoing_buffer_arg; - /* A counter which starts at 0. It is initialized the first time the socket - * options for collecting timestamps are set, and is incremented with each - * byte sent. */ - int bytes_counter; - bool socket_ts_enabled; /* True if timestamping options are set on the socket - */ - gpr_atm - stop_error_notification; /* Set to 1 if we do not want to be notified on - errors anymore */ }; struct backup_poller { gpr_mu* pollset_mu; grpc_closure run_poller; }; - } // namespace #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1)) @@ -330,7 +302,6 @@ static void tcp_free(grpc_tcp* tcp) { grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_resource_user_unref(tcp->resource_user); gpr_free(tcp->peer_string); - gpr_mu_destroy(&tcp->tb_mu); gpr_free(tcp); } @@ -376,10 +347,6 @@ static void tcp_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = reinterpret_cast(ep); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); - if (grpc_event_engine_can_track_errors()) { - gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); - grpc_fd_set_error(tcp->em_fd); - } TCP_UNREF(tcp, "destroy"); } @@ -546,234 +513,6 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, } } -/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number - * of bytes sent. */ -ssize_t tcp_send(int fd, const struct msghdr* msg) { - GPR_TIMER_SCOPE("sendmsg", 1); - ssize_t sent_length; - do { - /* TODO(klempner): Cork if this is a partial write */ - GRPC_STATS_INC_SYSCALL_WRITE(); - sent_length = sendmsg(fd, msg, SENDMSG_FLAGS); - } while (sent_length < 0 && errno == EINTR); - return sent_length; -} - -/** This is to be called if outgoing_buffer_arg is not null. On linux platforms, - * this will call sendmsg with socket options set to collect timestamps inside - * the kernel. On return, sent_length is set to the return value of the sendmsg - * call. Returns false if setting the socket options failed. This is not - * implemented for non-linux platforms currently, and crashes out. - */ -static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, - size_t sending_length, - ssize_t* sent_length, grpc_error** error); - -/** The callback function to be invoked when we get an error on the socket. */ -static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); - -#ifdef GRPC_LINUX_ERRQUEUE -static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, - size_t sending_length, - ssize_t* sent_length, - grpc_error** error) { - if (!tcp->socket_ts_enabled) { - uint32_t opt = grpc_core::kTimestampingSocketOptions; - if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, - static_cast(&opt), sizeof(opt)) != 0) { - *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp); - grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket."); - } - return false; - } - tcp->bytes_counter = -1; - tcp->socket_ts_enabled = true; - } - /* Set control message to indicate that you want timestamps. */ - union { - char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))]; - struct cmsghdr align; - } u; - cmsghdr* cmsg = reinterpret_cast(u.cmsg_buf); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SO_TIMESTAMPING; - cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t)); - *reinterpret_cast(CMSG_DATA(cmsg)) = - grpc_core::kTimestampingRecordingOptions; - msg->msg_control = u.cmsg_buf; - msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t)); - - /* If there was an error on sendmsg the logic in tcp_flush will handle it. */ - ssize_t length = tcp_send(tcp->fd, msg); - *sent_length = length; - /* Only save timestamps if all the bytes were taken by sendmsg. */ - if (sending_length == static_cast(length)) { - gpr_mu_lock(&tcp->tb_mu); - grpc_core::TracedBuffer::AddNewEntry( - &tcp->tb_head, static_cast(tcp->bytes_counter + length), - tcp->outgoing_buffer_arg); - gpr_mu_unlock(&tcp->tb_mu); - tcp->outgoing_buffer_arg = nullptr; - } - return true; -} - -/** Reads \a cmsg to derive timestamps from the control messages. If a valid - * timestamp is found, the traced buffer list is updated with this timestamp. - * The caller of this function should be looping on the control messages found - * in \a msg. \a cmsg should point to the control message that the caller wants - * processed. - * On return, a pointer to a control message is returned. On the next iteration, - * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */ -struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, - struct cmsghdr* cmsg) { - auto next_cmsg = CMSG_NXTHDR(msg, cmsg); - if (next_cmsg == nullptr) { - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_ERROR, "Received timestamp without extended error"); - } - return cmsg; - } - - if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) || - !(next_cmsg->cmsg_type == IP_RECVERR || - next_cmsg->cmsg_type == IPV6_RECVERR)) { - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_ERROR, "Unexpected control message"); - } - return cmsg; - } - - auto tss = reinterpret_cast(CMSG_DATA(cmsg)); - auto serr = reinterpret_cast(CMSG_DATA(next_cmsg)); - if (serr->ee_errno != ENOMSG || - serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) { - gpr_log(GPR_ERROR, "Unexpected control message"); - return cmsg; - } - /* The error handling can potentially be done on another thread so we need - * to protect the traced buffer list. A lock free list might be better. Using - * a simple mutex for now. */ - gpr_mu_lock(&tcp->tb_mu); - grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, tss); - gpr_mu_unlock(&tcp->tb_mu); - return next_cmsg; -} - -/** For linux platforms, reads the socket's error queue and processes error - * messages from the queue. Returns true if all the errors processed were - * timestamps. Returns false if any of the errors were not timestamps. For - * non-linux platforms, error processing is not used/enabled currently. - */ -static bool process_errors(grpc_tcp* tcp) { - while (true) { - struct iovec iov; - iov.iov_base = nullptr; - iov.iov_len = 0; - struct msghdr msg; - msg.msg_name = nullptr; - msg.msg_namelen = 0; - msg.msg_iov = &iov; - msg.msg_iovlen = 0; - msg.msg_flags = 0; - - union { - char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) + - CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/]; - struct cmsghdr align; - } aligned_buf; - memset(&aligned_buf, 0, sizeof(aligned_buf)); - - msg.msg_control = aligned_buf.rbuf; - msg.msg_controllen = sizeof(aligned_buf.rbuf); - - int r, saved_errno; - do { - r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE); - saved_errno = errno; - } while (r < 0 && saved_errno == EINTR); - - if (r == -1 && saved_errno == EAGAIN) { - return true; /* No more errors to process */ - } - if (r == -1) { - return false; - } - if (grpc_tcp_trace.enabled()) { - if ((msg.msg_flags & MSG_CTRUNC) == 1) { - gpr_log(GPR_INFO, "Error message was truncated."); - } - } - - if (msg.msg_controllen == 0) { - /* There was no control message found. It was probably spurious. */ - return true; - } - for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; - cmsg = CMSG_NXTHDR(&msg, cmsg)) { - if (cmsg->cmsg_level != SOL_SOCKET || - cmsg->cmsg_type != SCM_TIMESTAMPING) { - /* Got a control message that is not a timestamp. Don't know how to - * handle this. */ - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_INFO, - "unknown control message cmsg_level:%d cmsg_type:%d", - cmsg->cmsg_level, cmsg->cmsg_type); - } - return false; - } - process_timestamp(tcp, &msg, cmsg); - } - } -} - -static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { - grpc_tcp* tcp = static_cast(arg); - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error)); - } - - if (error != GRPC_ERROR_NONE || - static_cast(gpr_atm_acq_load(&tcp->stop_error_notification))) { - /* We aren't going to register to hear on error anymore, so it is safe to - * unref. */ - grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error)); - TCP_UNREF(tcp, "error-tracking"); - return; - } - - /* We are still interested in collecting timestamps, so let's try reading - * them. */ - if (!process_errors(tcp)) { - /* This was not a timestamps error. This was an actual error. Set the - * read and write closures to be ready. - */ - grpc_fd_set_readable(tcp->em_fd); - grpc_fd_set_writable(tcp->em_fd); - } - GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, - grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); -} - -#else /* GRPC_LINUX_ERRQUEUE */ -static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, - size_t sending_length, - ssize_t* sent_length, - grpc_error** error) { - gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); - GPR_ASSERT(0); - return false; -} - -static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { - gpr_log(GPR_ERROR, "Error handling is not supported for this platform"); - GPR_ASSERT(0); -} -#endif /* GRPC_LINUX_ERRQUEUE */ - /* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX @@ -818,20 +557,19 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = iov_size; + msg.msg_control = nullptr; + msg.msg_controllen = 0; msg.msg_flags = 0; - if (tcp->outgoing_buffer_arg != nullptr) { - if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, - error)) - return true; /* something went wrong with timestamps */ - } else { - msg.msg_control = nullptr; - msg.msg_controllen = 0; - GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); - GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); + GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); + GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); - sent_length = tcp_send(tcp->fd, &msg); - } + GPR_TIMER_SCOPE("sendmsg", 1); + do { + /* TODO(klempner): Cork if this is a partial write */ + GRPC_STATS_INC_SYSCALL_WRITE(); + sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); + } while (sent_length < 0 && errno == EINTR); if (sent_length < 0) { if (errno == EAGAIN) { @@ -855,7 +593,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { } GPR_ASSERT(tcp->outgoing_byte_idx == 0); - tcp->bytes_counter += sent_length; trailing = sending_length - static_cast(sent_length); while (trailing > 0) { size_t slice_length; @@ -870,6 +607,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { trailing -= slice_length; } } + if (outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); @@ -902,13 +640,14 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); } + GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } } static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { GPR_TIMER_SCOPE("tcp_write", 0); grpc_tcp* tcp = reinterpret_cast(ep); grpc_error* error = GRPC_ERROR_NONE; @@ -936,10 +675,6 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, } tcp->outgoing_buffer = buf; tcp->outgoing_byte_idx = 0; - tcp->outgoing_buffer_arg = arg; - if (arg) { - GPR_ASSERT(grpc_event_engine_can_track_errors()); - } if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); @@ -1057,8 +792,6 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->bytes_read_this_round = 0; /* Will be set to false by the very first endpoint read function */ tcp->is_first_read = true; - tcp->bytes_counter = -1; - tcp->socket_ts_enabled = false; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -1070,19 +803,6 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); grpc_resource_quota_unref_internal(resource_quota); - gpr_mu_init(&tcp->tb_mu); - tcp->tb_head = nullptr; - /* Start being notified on errors if event engine can track errors. */ - if (grpc_event_engine_can_track_errors()) { - /* Grab a ref to tcp so that we can safely access the tcp struct when - * processing errors. We unref when we no longer want to track errors - * separately. */ - TCP_REF(tcp, "error-tracking"); - gpr_atm_rel_store(&tcp->stop_error_notification, 0); - GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, - grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); - } return &tcp->base; } @@ -1101,11 +821,6 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, tcp->release_fd = fd; tcp->release_fd_cb = done; grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); - if (grpc_event_engine_can_track_errors()) { - /* Stop errors notification. */ - gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); - grpc_fd_set_error(tcp->em_fd); - } TCP_UNREF(tcp, "destroy"); } diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index eff825cb92..af89bd24db 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -31,10 +31,7 @@ #include -#include "src/core/lib/iomgr/port.h" - #include "src/core/lib/debug/trace.h" -#include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 824db07fbf..8ddf684fea 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) { gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str); } - grpc_fd* fdobj = grpc_fd_create(fd, name, true); + grpc_fd* fdobj = grpc_fd_create(fd, name, false); read_notifier_pollset = sp->server->pollsets[static_cast(gpr_atm_no_barrier_fetch_add( @@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) { listener->sibling = sp; sp->server = listener->server; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name, true); + sp->emfd = grpc_fd_create(fd, name, false); memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = listener->port_index; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 9595c028ce..b9f8145572 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd, s->tail = sp; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name, true); + sp->emfd = grpc_fd_create(fd, name, false); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = port_index; diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index 64c4a56ae9..b3cb442f18 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -296,7 +296,7 @@ static void on_write(void* tcpp, grpc_error* error) { /* Initiates a write. */ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { grpc_tcp* tcp = (grpc_tcp*)ep; grpc_winsocket* socket = tcp->socket; grpc_winsocket_callback_info* info = &socket->write_info; diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 3dd7cab855..bdb2d0e764 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_free(addr_str); - emfd_ = grpc_fd_create(fd, name, true); + emfd_ = grpc_fd_create(fd, name, false); memcpy(&addr_, addr, sizeof(grpc_resolved_address)); GPR_ASSERT(emfd_); gpr_free(name); diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index f40f969bb7..840b2e73bc 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -254,7 +254,7 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, } static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0); unsigned i; @@ -342,7 +342,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, return; } - grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg); + grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb); } static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) { diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index d76d582638..aff723ed04 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -259,7 +259,7 @@ static grpc_error* on_handshake_next_done_locked( grpc_slice_buffer_reset_and_unref_internal(&h->outgoing); grpc_slice_buffer_add(&h->outgoing, to_send); grpc_endpoint_write(h->args->endpoint, &h->outgoing, - &h->on_handshake_data_sent_to_peer, nullptr); + &h->on_handshake_data_sent_to_peer); } else if (handshaker_result == nullptr) { // There is nothing to send, but need to read from peer. grpc_endpoint_read(h->args->endpoint, h->args->read_buffer, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 0f68e823d7..6e6d756eec 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -82,7 +82,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', - 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -103,7 +102,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', - 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index ade23133c5..c03ebcf409 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -115,7 +115,7 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags, grpc_schedule_on_exec_ctx); /* Write data */ - grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure, nullptr); + grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure); grpc_core::ExecCtx::Get()->Flush(); /* Await completion, unless the request is large and write may not finish diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index f7396a1684..3d133cfc18 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -104,7 +104,7 @@ static void handle_write() { grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer); grpc_slice_buffer_add(&state.outgoing_buffer, slice); - grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr); + grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write); } static void handle_read(void* arg, grpc_error* error) { diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index ea9c000efb..f02fa9d998 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -201,7 +201,7 @@ static void on_client_write_done(void* arg, grpc_error* error) { &conn->client_write_buffer); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done, nullptr); + &conn->on_client_write_done); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "write_done"); @@ -226,7 +226,7 @@ static void on_server_write_done(void* arg, grpc_error* error) { &conn->server_write_buffer); conn->server_is_writing = true; grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done, nullptr); + &conn->on_server_write_done); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "server_write"); @@ -257,7 +257,7 @@ static void on_client_read_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "client_read"); conn->server_is_writing = true; grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done, nullptr); + &conn->on_server_write_done); } // Read more data. grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, @@ -288,7 +288,7 @@ static void on_server_read_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "server_read"); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done, nullptr); + &conn->on_client_write_done); } // Read more data. grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, @@ -340,7 +340,7 @@ static void on_server_connect_done(void* arg, grpc_error* error) { grpc_slice_buffer_add(&conn->client_write_buffer, slice); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_write_response_done, nullptr); + &conn->on_write_response_done); } /** diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD index 675d9e6278..002671a5fa 100644 --- a/test/core/iomgr/BUILD +++ b/test/core/iomgr/BUILD @@ -246,19 +246,6 @@ grpc_cc_test( ], ) -grpc_cc_test( - name = "buffer_list_test", - srcs = ["buffer_list_test.cc"], - language = "C++", - deps = [ - "//:gpr", - "//:grpc", - "//test/core/util:gpr_test_util", - "//test/core/util:grpc_test_util", - ], -) - - grpc_cc_test( name = "tcp_server_posix_test", srcs = ["tcp_server_posix_test.cc"], diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc deleted file mode 100644 index f1773580bd..0000000000 --- a/test/core/iomgr/buffer_list_test.cc +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/iomgr/port.h" - -#include "src/core/lib/iomgr/buffer_list.h" - -#include - -#include "test/core/util/test_config.h" - -#ifdef GRPC_LINUX_ERRQUEUE - -static void TestShutdownFlushesListVerifier(void* arg, - grpc_core::Timestamps* ts, - grpc_error* error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(arg != nullptr); - gpr_atm* done = reinterpret_cast(arg); - gpr_atm_rel_store(done, static_cast(1)); -} - -/** Tests that all TracedBuffer elements in the list are flushed out on - * shutdown. - * Also tests that arg is passed correctly. - */ -static void TestShutdownFlushesList() { - grpc_core::grpc_tcp_set_write_timestamps_callback( - TestShutdownFlushesListVerifier); - grpc_core::TracedBuffer* list = nullptr; -#define NUM_ELEM 5 - gpr_atm verifier_called[NUM_ELEM]; - for (auto i = 0; i < NUM_ELEM; i++) { - gpr_atm_rel_store(&verifier_called[i], static_cast(0)); - grpc_core::TracedBuffer::AddNewEntry( - &list, i, static_cast(&verifier_called[i])); - } - grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE); - GPR_ASSERT(list == nullptr); - for (auto i = 0; i < NUM_ELEM; i++) { - GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) == - static_cast(1)); - } -} - -static void TestVerifierCalledOnAckVerifier(void* arg, - grpc_core::Timestamps* ts, - grpc_error* error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME); - GPR_ASSERT(ts->acked_time.tv_sec == 123); - GPR_ASSERT(ts->acked_time.tv_nsec == 456); - gpr_atm* done = reinterpret_cast(arg); - gpr_atm_rel_store(done, static_cast(1)); -} - -/** Tests that the timestamp verifier is called on an ACK timestamp. - */ -static void TestVerifierCalledOnAck() { - struct sock_extended_err serr; - serr.ee_data = 213; - serr.ee_info = SCM_TSTAMP_ACK; - struct scm_timestamping tss; - tss.ts[0].tv_sec = 123; - tss.ts[0].tv_nsec = 456; - grpc_core::grpc_tcp_set_write_timestamps_callback( - TestVerifierCalledOnAckVerifier); - grpc_core::TracedBuffer* list = nullptr; - gpr_atm verifier_called; - gpr_atm_rel_store(&verifier_called, static_cast(0)); - grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called); - grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss); - GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast(1)); - GPR_ASSERT(list == nullptr); - grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE); -} - -static void TestTcpBufferList() { - TestVerifierCalledOnAck(); - TestShutdownFlushesList(); -} - -int main(int argc, char** argv) { - grpc_test_init(argc, argv); - grpc_init(); - TestTcpBufferList(); - grpc_shutdown(); - return 0; -} - -#else /* GRPC_LINUX_ERRQUEUE */ - -int main(int argc, char** argv) { return 0; } - -#endif /* GRPC_LINUX_ERRQUEUE */ diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc index a9e8ba86c5..8db8ac5ed6 100644 --- a/test/core/iomgr/endpoint_tests.cc +++ b/test/core/iomgr/endpoint_tests.cc @@ -150,8 +150,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) { &state->current_write_data); grpc_slice_buffer_reset_and_unref(&state->outgoing); grpc_slice_buffer_addn(&state->outgoing, slices, nslices); - grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write, - nullptr); + grpc_endpoint_write(state->write_ep, &state->outgoing, + &state->done_write); gpr_free(slices); return; } @@ -294,8 +294,7 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); grpc_endpoint_write(f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, - grpc_schedule_on_exec_ctx), - nullptr); + grpc_schedule_on_exec_ctx)); wait_for_fail_count(&fail_count, 3); grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index 6447cc234d..3e87831e44 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -36,9 +36,6 @@ #include #include "src/core/lib/gpr/useful.h" -#include "src/core/lib/iomgr/buffer_list.h" -#include "src/core/lib/iomgr/ev_posix.h" -#include "src/core/lib/iomgr/sockaddr_posix.h" #include "src/core/lib/slice/slice_internal.h" #include "test/core/iomgr/endpoint_tests.h" #include "test/core/util/test_config.h" @@ -71,43 +68,6 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); } -static void create_inet_sockets(int sv[2]) { - /* Prepare listening socket */ - struct sockaddr_in addr; - memset(&addr, 0, sizeof(struct sockaddr_in)); - addr.sin_family = AF_INET; - int sock = socket(AF_INET, SOCK_STREAM, 0); - GPR_ASSERT(sock); - GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0); - listen(sock, 1); - - /* Prepare client socket and connect to server */ - socklen_t len = sizeof(sockaddr_in); - GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0); - - int client = socket(AF_INET, SOCK_STREAM, 0); - GPR_ASSERT(client); - int ret; - do { - ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in)); - } while (ret == -1 && errno == EINTR); - - /* Accept client connection */ - len = sizeof(socklen_t); - int server; - do { - server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len); - } while (server == -1 && errno == EINTR); - GPR_ASSERT(server != -1); - - sv[0] = server; - sv[1] = client; - int flags = fcntl(sv[0], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); - flags = fcntl(sv[1], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); -} - static ssize_t fill_socket(int fd) { ssize_t write_bytes; ssize_t total_bytes = 0; @@ -329,10 +289,11 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size, static void write_done(void* user_data /* write_socket_state */, grpc_error* error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); struct write_socket_state* state = static_cast(user_data); + gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(g_mu); + gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; GPR_ASSERT( GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); @@ -379,24 +340,10 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { gpr_free(buf); } -/* Verifier for timestamps callback for write_test */ -void timestamps_verifier(void* arg, grpc_core::Timestamps* ts, - grpc_error* error) { - GPR_ASSERT(error == GRPC_ERROR_NONE); - GPR_ASSERT(arg != nullptr); - GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME); - GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME); - GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME); - gpr_atm* done_timestamps = (gpr_atm*)arg; - gpr_atm_rel_store(done_timestamps, static_cast(1)); -} - /* Write to a socket using the grpc_tcp API, then drain it directly. Note that if the write does not complete immediately we need to drain the - socket in parallel with the read. If collect_timestamps is true, it will - try to get timestamps for the write. */ -static void write_test(size_t num_bytes, size_t slice_size, - bool collect_timestamps) { + socket in parallel with the read. */ +static void write_test(size_t num_bytes, size_t slice_size) { int sv[2]; grpc_endpoint* ep; struct write_socket_state state; @@ -409,27 +356,19 @@ static void write_test(size_t num_bytes, size_t slice_size, grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); grpc_core::ExecCtx exec_ctx; - if (collect_timestamps && !grpc_event_engine_can_track_errors()) { - return; - } - gpr_log(GPR_INFO, "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR, num_bytes, slice_size); - if (collect_timestamps) { - create_inet_sockets(sv); - } else { - create_sockets(sv); - } + create_sockets(sv); grpc_arg a[1]; a[0].key = const_cast(GRPC_ARG_TCP_READ_CHUNK_SIZE); a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = static_cast(slice_size); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps), - &args, "test"); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args, + "test"); grpc_endpoint_add_to_pollset(ep, g_pollset); state.ep = ep; @@ -442,26 +381,18 @@ static void write_test(size_t num_bytes, size_t slice_size, GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state, grpc_schedule_on_exec_ctx); - gpr_atm done_timestamps; - gpr_atm_rel_store(&done_timestamps, static_cast(0)); - grpc_endpoint_write(ep, &outgoing, &write_done_closure, - grpc_event_engine_can_track_errors() && collect_timestamps - ? (void*)&done_timestamps - : nullptr); + grpc_endpoint_write(ep, &outgoing, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); - exec_ctx.Flush(); gpr_mu_lock(g_mu); for (;;) { grpc_pollset_worker* worker = nullptr; - if (state.write_done && - (!(grpc_event_engine_can_track_errors() && collect_timestamps) || - gpr_atm_acq_load(&done_timestamps) == static_cast(1))) { + if (state.write_done) { break; } GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - exec_ctx.Flush(); + gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); @@ -566,21 +497,14 @@ void run_tests(void) { large_read_test(8192); large_read_test(1); - write_test(100, 8192, false); - write_test(100, 1, false); - write_test(100000, 8192, false); - write_test(100000, 1, false); - write_test(100000, 137, false); - - write_test(100, 8192, true); - write_test(100, 1, true); - write_test(100000, 8192, true); - write_test(100000, 1, true); - write_test(100, 137, true); + write_test(100, 8192); + write_test(100, 1); + write_test(100000, 8192); + write_test(100000, 1); + write_test(100000, 137); for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { - write_test(40320, i, false); - write_test(40320, i, true); + write_test(40320, i); } release_fd_test(100, 8192); @@ -625,7 +549,6 @@ int main(int argc, char** argv) { grpc_closure destroyed; grpc_test_init(argc, argv); grpc_init(); - grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier); { grpc_core::ExecCtx exec_ctx; g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc index ef6fd62b51..1156cd5fc5 100644 --- a/test/core/util/mock_endpoint.cc +++ b/test/core/util/mock_endpoint.cc @@ -55,7 +55,7 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, } static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { mock_endpoint* m = reinterpret_cast(ep); for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index 3cc8ad6fe1..5958216747 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -76,7 +76,7 @@ static half* other_half(half* h) { } static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { half* m = other_half(reinterpret_cast(ep)); gpr_mu_lock(&m->parent->mu); grpc_error* error = GRPC_ERROR_NONE; diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc index 62ed72a629..f2efb049b4 100644 --- a/test/core/util/trickle_endpoint.cc +++ b/test/core/util/trickle_endpoint.cc @@ -62,7 +62,7 @@ static void maybe_call_write_cb_locked(trickle_endpoint* te) { } static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { trickle_endpoint* te = reinterpret_cast(ep); gpr_mu_lock(&te->mu); GPR_ASSERT(te->write_cb == nullptr); @@ -186,8 +186,7 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) { te->last_write = now; grpc_endpoint_write( te->wrapped, &te->writing_buffer, - GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx), - nullptr); + GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx)); maybe_call_write_cb_locked(te); } } diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 189923a841..1e9bd273aa 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -96,7 +96,7 @@ class DummyEndpoint : public grpc_endpoint { } static void write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb, void* arg) { + grpc_closure* cb) { GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); } diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index e6d4b1d98f..43ebf8cad9 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1067,7 +1067,6 @@ src/core/lib/http/format_request.h \ src/core/lib/http/httpcli.h \ src/core/lib/http/parser.h \ src/core/lib/iomgr/block_annotate.h \ -src/core/lib/iomgr/buffer_list.h \ src/core/lib/iomgr/call_combiner.h \ src/core/lib/iomgr/closure.h \ src/core/lib/iomgr/combiner.h \ @@ -1083,7 +1082,6 @@ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/exec_ctx.h \ src/core/lib/iomgr/executor.h \ src/core/lib/iomgr/gethostname.h \ -src/core/lib/iomgr/internal_errqueue.h \ src/core/lib/iomgr/iocp_windows.h \ src/core/lib/iomgr/iomgr.h \ src/core/lib/iomgr/iomgr_custom.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 7cd1dc7bf3..c1706fd070 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1159,8 +1159,6 @@ src/core/lib/http/parser.cc \ src/core/lib/http/parser.h \ src/core/lib/iomgr/README.md \ src/core/lib/iomgr/block_annotate.h \ -src/core/lib/iomgr/buffer_list.cc \ -src/core/lib/iomgr/buffer_list.h \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/call_combiner.h \ src/core/lib/iomgr/closure.h \ @@ -1196,8 +1194,6 @@ src/core/lib/iomgr/gethostname.h \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ -src/core/lib/iomgr/internal_errqueue.cc \ -src/core/lib/iomgr/internal_errqueue.h \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iocp_windows.h \ src/core/lib/iomgr/iomgr.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 8ea5126fde..5014aea9cd 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -163,23 +163,6 @@ "third_party": false, "type": "target" }, - { - "deps": [ - "gpr", - "gpr_test_util", - "grpc", - "grpc_test_util" - ], - "headers": [], - "is_filegroup": false, - "language": "c", - "name": "buffer_list_test", - "src": [ - "test/core/iomgr/buffer_list_test.cc" - ], - "third_party": false, - "type": "target" - }, { "deps": [ "gpr", @@ -9505,7 +9488,6 @@ "src/core/lib/http/format_request.cc", "src/core/lib/http/httpcli.cc", "src/core/lib/http/parser.cc", - "src/core/lib/iomgr/buffer_list.cc", "src/core/lib/iomgr/call_combiner.cc", "src/core/lib/iomgr/combiner.cc", "src/core/lib/iomgr/endpoint.cc", @@ -9526,7 +9508,6 @@ "src/core/lib/iomgr/gethostname_fallback.cc", "src/core/lib/iomgr/gethostname_host_name_max.cc", "src/core/lib/iomgr/gethostname_sysconf.cc", - "src/core/lib/iomgr/internal_errqueue.cc", "src/core/lib/iomgr/iocp_windows.cc", "src/core/lib/iomgr/iomgr.cc", "src/core/lib/iomgr/iomgr_custom.cc", @@ -9686,7 +9667,6 @@ "src/core/lib/http/httpcli.h", "src/core/lib/http/parser.h", "src/core/lib/iomgr/block_annotate.h", - "src/core/lib/iomgr/buffer_list.h", "src/core/lib/iomgr/call_combiner.h", "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/combiner.h", @@ -9702,7 +9682,6 @@ "src/core/lib/iomgr/exec_ctx.h", "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/gethostname.h", - "src/core/lib/iomgr/internal_errqueue.h", "src/core/lib/iomgr/iocp_windows.h", "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_custom.h", @@ -9838,7 +9817,6 @@ "src/core/lib/http/httpcli.h", "src/core/lib/http/parser.h", "src/core/lib/iomgr/block_annotate.h", - "src/core/lib/iomgr/buffer_list.h", "src/core/lib/iomgr/call_combiner.h", "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/combiner.h", @@ -9854,7 +9832,6 @@ "src/core/lib/iomgr/exec_ctx.h", "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/gethostname.h", - "src/core/lib/iomgr/internal_errqueue.h", "src/core/lib/iomgr/iocp_windows.h", "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_custom.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index fba76d69d1..51c7b57d8a 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -195,26 +195,6 @@ ], "uses_polling": false }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [ - "uv" - ], - "flaky": false, - "gtest": false, - "language": "c", - "name": "buffer_list_test", - "platforms": [ - "linux" - ], - "uses_polling": true - }, { "args": [], "benchmark": false, -- cgit v1.2.3 From 8d47cd4992c3c3546bef23568fde5fe132144666 Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 27 Aug 2018 14:56:01 -0700 Subject: Revert "Revert "Fathom tcp changes"" --- BUILD | 4 + CMakeLists.txt | 46 +++ Makefile | 48 ++++ build.yaml | 18 ++ config.m4 | 2 + config.w32 | 2 + gRPC-C++.podspec | 4 + gRPC-Core.podspec | 6 + grpc.gemspec | 4 + grpc.gyp | 8 + package.xml | 4 + .../client_channel/http_connect_handshaker.cc | 2 +- .../chttp2/client/insecure/channel_create_posix.cc | 2 +- .../chttp2/server/insecure/server_chttp2_posix.cc | 2 +- .../transport/chttp2/transport/chttp2_transport.cc | 3 +- src/core/lib/http/httpcli.cc | 2 +- src/core/lib/iomgr/buffer_list.cc | 134 +++++++++ src/core/lib/iomgr/buffer_list.h | 96 +++++++ src/core/lib/iomgr/endpoint.cc | 4 +- src/core/lib/iomgr/endpoint.h | 8 +- src/core/lib/iomgr/endpoint_cfstream.cc | 2 +- src/core/lib/iomgr/endpoint_pair_posix.cc | 4 +- src/core/lib/iomgr/ev_posix.cc | 9 +- src/core/lib/iomgr/internal_errqueue.cc | 40 +++ src/core/lib/iomgr/internal_errqueue.h | 62 ++++ src/core/lib/iomgr/port.h | 6 + src/core/lib/iomgr/tcp_client_posix.cc | 2 +- src/core/lib/iomgr/tcp_custom.cc | 2 +- src/core/lib/iomgr/tcp_posix.cc | 311 ++++++++++++++++++++- src/core/lib/iomgr/tcp_posix.h | 3 + src/core/lib/iomgr/tcp_server_posix.cc | 4 +- .../lib/iomgr/tcp_server_utils_posix_common.cc | 2 +- src/core/lib/iomgr/tcp_windows.cc | 2 +- src/core/lib/iomgr/udp_server.cc | 2 +- src/core/lib/security/transport/secure_endpoint.cc | 4 +- .../lib/security/transport/security_handshaker.cc | 2 +- src/python/grpcio/grpc_core_dependencies.py | 2 + test/core/bad_client/bad_client.cc | 2 +- test/core/end2end/bad_server_response_test.cc | 2 +- test/core/end2end/fixtures/http_proxy_fixture.cc | 10 +- test/core/iomgr/BUILD | 13 + test/core/iomgr/buffer_list_test.cc | 111 ++++++++ test/core/iomgr/endpoint_tests.cc | 7 +- test/core/iomgr/tcp_posix_test.cc | 109 ++++++-- test/core/util/mock_endpoint.cc | 2 +- test/core/util/passthru_endpoint.cc | 2 +- test/core/util/trickle_endpoint.cc | 5 +- test/cpp/microbenchmarks/bm_chttp2_transport.cc | 2 +- tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core.internal | 4 + tools/run_tests/generated/sources_and_headers.json | 23 ++ tools/run_tests/generated/tests.json | 20 ++ 52 files changed, 1104 insertions(+), 68 deletions(-) create mode 100644 src/core/lib/iomgr/buffer_list.cc create mode 100644 src/core/lib/iomgr/buffer_list.h create mode 100644 src/core/lib/iomgr/internal_errqueue.cc create mode 100644 src/core/lib/iomgr/internal_errqueue.h create mode 100644 test/core/iomgr/buffer_list_test.cc (limited to 'src/core/lib/iomgr/port.h') diff --git a/BUILD b/BUILD index e1b00a5cf0..c01b971281 100644 --- a/BUILD +++ b/BUILD @@ -696,6 +696,7 @@ grpc_cc_library( "src/core/lib/http/format_request.cc", "src/core/lib/http/httpcli.cc", "src/core/lib/http/parser.cc", + "src/core/lib/iomgr/buffer_list.cc", "src/core/lib/iomgr/call_combiner.cc", "src/core/lib/iomgr/combiner.cc", "src/core/lib/iomgr/endpoint.cc", @@ -716,6 +717,7 @@ grpc_cc_library( "src/core/lib/iomgr/gethostname_fallback.cc", "src/core/lib/iomgr/gethostname_host_name_max.cc", "src/core/lib/iomgr/gethostname_sysconf.cc", + "src/core/lib/iomgr/internal_errqueue.cc", "src/core/lib/iomgr/iocp_windows.cc", "src/core/lib/iomgr/iomgr.cc", "src/core/lib/iomgr/iomgr_custom.cc", @@ -845,6 +847,7 @@ grpc_cc_library( "src/core/lib/http/format_request.h", "src/core/lib/http/httpcli.h", "src/core/lib/http/parser.h", + "src/core/lib/iomgr/buffer_list.h", "src/core/lib/iomgr/block_annotate.h", "src/core/lib/iomgr/call_combiner.h", "src/core/lib/iomgr/closure.h", @@ -862,6 +865,7 @@ grpc_cc_library( "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/gethostname.h", "src/core/lib/iomgr/gevent_util.h", + "src/core/lib/iomgr/internal_errqueue.h", "src/core/lib/iomgr/iocp_windows.h", "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_custom.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index a330fefc27..3895d4c0f1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -230,6 +230,9 @@ add_dependencies(buildtests_c avl_test) add_dependencies(buildtests_c bad_server_response_test) add_dependencies(buildtests_c bin_decoder_test) add_dependencies(buildtests_c bin_encoder_test) +if(_gRPC_PLATFORM_LINUX) +add_dependencies(buildtests_c buffer_list_test) +endif() add_dependencies(buildtests_c channel_create_test) add_dependencies(buildtests_c chttp2_hpack_encoder_test) add_dependencies(buildtests_c chttp2_stream_map_test) @@ -959,6 +962,7 @@ add_library(grpc src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc + src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -979,6 +983,7 @@ add_library(grpc src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc + src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -1365,6 +1370,7 @@ add_library(grpc_cronet src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc + src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -1385,6 +1391,7 @@ add_library(grpc_cronet src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc + src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -1757,6 +1764,7 @@ add_library(grpc_test_util src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc + src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -1777,6 +1785,7 @@ add_library(grpc_test_util src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc + src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -2065,6 +2074,7 @@ add_library(grpc_test_util_unsecure src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc + src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -2085,6 +2095,7 @@ add_library(grpc_test_util_unsecure src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc + src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -2352,6 +2363,7 @@ add_library(grpc_unsecure src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc + src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -2372,6 +2384,7 @@ add_library(grpc_unsecure src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc + src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -3192,6 +3205,7 @@ add_library(grpc++_cronet src/core/lib/http/format_request.cc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc + src/core/lib/iomgr/buffer_list.cc src/core/lib/iomgr/call_combiner.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc @@ -3212,6 +3226,7 @@ add_library(grpc++_cronet src/core/lib/iomgr/gethostname_fallback.cc src/core/lib/iomgr/gethostname_host_name_max.cc src/core/lib/iomgr/gethostname_sysconf.cc + src/core/lib/iomgr/internal_errqueue.cc src/core/lib/iomgr/iocp_windows.cc src/core/lib/iomgr/iomgr.cc src/core/lib/iomgr/iomgr_custom.cc @@ -5835,6 +5850,37 @@ target_link_libraries(bin_encoder_test grpc ) +endif (gRPC_BUILD_TESTS) +if (gRPC_BUILD_TESTS) +if(_gRPC_PLATFORM_LINUX) + +add_executable(buffer_list_test + test/core/iomgr/buffer_list_test.cc +) + + +target_include_directories(buffer_list_test + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include + PRIVATE ${_gRPC_SSL_INCLUDE_DIR} + PRIVATE ${_gRPC_PROTOBUF_INCLUDE_DIR} + PRIVATE ${_gRPC_ZLIB_INCLUDE_DIR} + PRIVATE ${_gRPC_BENCHMARK_INCLUDE_DIR} + PRIVATE ${_gRPC_CARES_INCLUDE_DIR} + PRIVATE ${_gRPC_GFLAGS_INCLUDE_DIR} + PRIVATE ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + PRIVATE ${_gRPC_NANOPB_INCLUDE_DIR} +) + +target_link_libraries(buffer_list_test + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_test_util + grpc + gpr_test_util + gpr +) + +endif() endif (gRPC_BUILD_TESTS) if (gRPC_BUILD_TESTS) diff --git a/Makefile b/Makefile index e9a9486a3c..7caf585ee9 100644 --- a/Makefile +++ b/Makefile @@ -978,6 +978,7 @@ avl_test: $(BINDIR)/$(CONFIG)/avl_test bad_server_response_test: $(BINDIR)/$(CONFIG)/bad_server_response_test bin_decoder_test: $(BINDIR)/$(CONFIG)/bin_decoder_test bin_encoder_test: $(BINDIR)/$(CONFIG)/bin_encoder_test +buffer_list_test: $(BINDIR)/$(CONFIG)/buffer_list_test channel_create_test: $(BINDIR)/$(CONFIG)/channel_create_test check_epollexclusive: $(BINDIR)/$(CONFIG)/check_epollexclusive chttp2_hpack_encoder_test: $(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test @@ -1434,6 +1435,7 @@ buildtests_c: privatelibs_c \ $(BINDIR)/$(CONFIG)/bad_server_response_test \ $(BINDIR)/$(CONFIG)/bin_decoder_test \ $(BINDIR)/$(CONFIG)/bin_encoder_test \ + $(BINDIR)/$(CONFIG)/buffer_list_test \ $(BINDIR)/$(CONFIG)/channel_create_test \ $(BINDIR)/$(CONFIG)/chttp2_hpack_encoder_test \ $(BINDIR)/$(CONFIG)/chttp2_stream_map_test \ @@ -1950,6 +1952,8 @@ test_c: buildtests_c $(Q) $(BINDIR)/$(CONFIG)/bin_decoder_test || ( echo test bin_decoder_test failed ; exit 1 ) $(E) "[RUN] Testing bin_encoder_test" $(Q) $(BINDIR)/$(CONFIG)/bin_encoder_test || ( echo test bin_encoder_test failed ; exit 1 ) + $(E) "[RUN] Testing buffer_list_test" + $(Q) $(BINDIR)/$(CONFIG)/buffer_list_test || ( echo test buffer_list_test failed ; exit 1 ) $(E) "[RUN] Testing channel_create_test" $(Q) $(BINDIR)/$(CONFIG)/channel_create_test || ( echo test channel_create_test failed ; exit 1 ) $(E) "[RUN] Testing chttp2_hpack_encoder_test" @@ -3460,6 +3464,7 @@ LIBGRPC_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ + src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -3480,6 +3485,7 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ + src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -3865,6 +3871,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ + src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -3885,6 +3892,7 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ + src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -4255,6 +4263,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ + src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -4275,6 +4284,7 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ + src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -4554,6 +4564,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ + src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -4574,6 +4585,7 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ + src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -4819,6 +4831,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ + src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -4839,6 +4852,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ + src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -5647,6 +5661,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ + src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -5667,6 +5682,7 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ + src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ @@ -10701,6 +10717,38 @@ endif endif +BUFFER_LIST_TEST_SRC = \ + test/core/iomgr/buffer_list_test.cc \ + +BUFFER_LIST_TEST_OBJS = $(addprefix $(OBJDIR)/$(CONFIG)/, $(addsuffix .o, $(basename $(BUFFER_LIST_TEST_SRC)))) +ifeq ($(NO_SECURE),true) + +# You can't build secure targets if you don't have OpenSSL. + +$(BINDIR)/$(CONFIG)/buffer_list_test: openssl_dep_error + +else + + + +$(BINDIR)/$(CONFIG)/buffer_list_test: $(BUFFER_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + $(E) "[LD] Linking $@" + $(Q) mkdir -p `dirname $@` + $(Q) $(LD) $(LDFLAGS) $(BUFFER_LIST_TEST_OBJS) $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a $(LDLIBS) $(LDLIBS_SECURE) -o $(BINDIR)/$(CONFIG)/buffer_list_test + +endif + +$(OBJDIR)/$(CONFIG)/test/core/iomgr/buffer_list_test.o: $(LIBDIR)/$(CONFIG)/libgrpc_test_util.a $(LIBDIR)/$(CONFIG)/libgrpc.a $(LIBDIR)/$(CONFIG)/libgpr_test_util.a $(LIBDIR)/$(CONFIG)/libgpr.a + +deps_buffer_list_test: $(BUFFER_LIST_TEST_OBJS:.o=.dep) + +ifneq ($(NO_SECURE),true) +ifneq ($(NO_DEPS),true) +-include $(BUFFER_LIST_TEST_OBJS:.o=.dep) +endif +endif + + CHANNEL_CREATE_TEST_SRC = \ test/core/surface/channel_create_test.cc \ diff --git a/build.yaml b/build.yaml index ce21068c31..a7ebee7572 100644 --- a/build.yaml +++ b/build.yaml @@ -256,6 +256,7 @@ filegroups: - src/core/lib/http/format_request.cc - src/core/lib/http/httpcli.cc - src/core/lib/http/parser.cc + - src/core/lib/iomgr/buffer_list.cc - src/core/lib/iomgr/call_combiner.cc - src/core/lib/iomgr/combiner.cc - src/core/lib/iomgr/endpoint.cc @@ -276,6 +277,7 @@ filegroups: - src/core/lib/iomgr/gethostname_fallback.cc - src/core/lib/iomgr/gethostname_host_name_max.cc - src/core/lib/iomgr/gethostname_sysconf.cc + - src/core/lib/iomgr/internal_errqueue.cc - src/core/lib/iomgr/iocp_windows.cc - src/core/lib/iomgr/iomgr.cc - src/core/lib/iomgr/iomgr_custom.cc @@ -434,6 +436,7 @@ filegroups: - src/core/lib/http/httpcli.h - src/core/lib/http/parser.h - src/core/lib/iomgr/block_annotate.h + - src/core/lib/iomgr/buffer_list.h - src/core/lib/iomgr/call_combiner.h - src/core/lib/iomgr/closure.h - src/core/lib/iomgr/combiner.h @@ -449,6 +452,7 @@ filegroups: - src/core/lib/iomgr/exec_ctx.h - src/core/lib/iomgr/executor.h - src/core/lib/iomgr/gethostname.h + - src/core/lib/iomgr/internal_errqueue.h - src/core/lib/iomgr/iocp_windows.h - src/core/lib/iomgr/iomgr.h - src/core/lib/iomgr/iomgr_custom.h @@ -2139,6 +2143,20 @@ targets: - grpc_test_util - grpc uses_polling: false +- name: buffer_list_test + build: test + language: c + src: + - test/core/iomgr/buffer_list_test.cc + deps: + - grpc_test_util + - grpc + - gpr_test_util + - gpr + exclude_iomgrs: + - uv + platforms: + - linux - name: channel_create_test build: test language: c diff --git a/config.m4 b/config.m4 index 7825274eea..af3624cdd1 100644 --- a/config.m4 +++ b/config.m4 @@ -108,6 +108,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/http/format_request.cc \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ + src/core/lib/iomgr/buffer_list.cc \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ @@ -128,6 +129,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ + src/core/lib/iomgr/internal_errqueue.cc \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iomgr.cc \ src/core/lib/iomgr/iomgr_custom.cc \ diff --git a/config.w32 b/config.w32 index a9d1e6c9d0..ad91ee40bd 100644 --- a/config.w32 +++ b/config.w32 @@ -83,6 +83,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\http\\format_request.cc " + "src\\core\\lib\\http\\httpcli.cc " + "src\\core\\lib\\http\\parser.cc " + + "src\\core\\lib\\iomgr\\buffer_list.cc " + "src\\core\\lib\\iomgr\\call_combiner.cc " + "src\\core\\lib\\iomgr\\combiner.cc " + "src\\core\\lib\\iomgr\\endpoint.cc " + @@ -103,6 +104,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\iomgr\\gethostname_fallback.cc " + "src\\core\\lib\\iomgr\\gethostname_host_name_max.cc " + "src\\core\\lib\\iomgr\\gethostname_sysconf.cc " + + "src\\core\\lib\\iomgr\\internal_errqueue.cc " + "src\\core\\lib\\iomgr\\iocp_windows.cc " + "src\\core\\lib\\iomgr\\iomgr.cc " + "src\\core\\lib\\iomgr\\iomgr_custom.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 207d7baa8f..a387794f57 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -382,6 +382,7 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.h', 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', + 'src/core/lib/iomgr/buffer_list.h', 'src/core/lib/iomgr/call_combiner.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', @@ -397,6 +398,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', + 'src/core/lib/iomgr/internal_errqueue.h', 'src/core/lib/iomgr/iocp_windows.h', 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_custom.h', @@ -570,6 +572,7 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.h', 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', + 'src/core/lib/iomgr/buffer_list.h', 'src/core/lib/iomgr/call_combiner.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', @@ -585,6 +588,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', + 'src/core/lib/iomgr/internal_errqueue.h', 'src/core/lib/iomgr/iocp_windows.h', 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_custom.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index f1a9bce7f2..9832283e5c 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -394,6 +394,7 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.h', 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', + 'src/core/lib/iomgr/buffer_list.h', 'src/core/lib/iomgr/call_combiner.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', @@ -409,6 +410,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', + 'src/core/lib/iomgr/internal_errqueue.h', 'src/core/lib/iomgr/iocp_windows.h', 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_custom.h', @@ -538,6 +540,7 @@ Pod::Spec.new do |s| 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', + 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -558,6 +561,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', + 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', @@ -993,6 +997,7 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.h', 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', + 'src/core/lib/iomgr/buffer_list.h', 'src/core/lib/iomgr/call_combiner.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', @@ -1008,6 +1013,7 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/exec_ctx.h', 'src/core/lib/iomgr/executor.h', 'src/core/lib/iomgr/gethostname.h', + 'src/core/lib/iomgr/internal_errqueue.h', 'src/core/lib/iomgr/iocp_windows.h', 'src/core/lib/iomgr/iomgr.h', 'src/core/lib/iomgr/iomgr_custom.h', diff --git a/grpc.gemspec b/grpc.gemspec index 5c38071d15..c8e58faec9 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -330,6 +330,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/http/httpcli.h ) s.files += %w( src/core/lib/http/parser.h ) s.files += %w( src/core/lib/iomgr/block_annotate.h ) + s.files += %w( src/core/lib/iomgr/buffer_list.h ) s.files += %w( src/core/lib/iomgr/call_combiner.h ) s.files += %w( src/core/lib/iomgr/closure.h ) s.files += %w( src/core/lib/iomgr/combiner.h ) @@ -345,6 +346,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/exec_ctx.h ) s.files += %w( src/core/lib/iomgr/executor.h ) s.files += %w( src/core/lib/iomgr/gethostname.h ) + s.files += %w( src/core/lib/iomgr/internal_errqueue.h ) s.files += %w( src/core/lib/iomgr/iocp_windows.h ) s.files += %w( src/core/lib/iomgr/iomgr.h ) s.files += %w( src/core/lib/iomgr/iomgr_custom.h ) @@ -474,6 +476,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/http/format_request.cc ) s.files += %w( src/core/lib/http/httpcli.cc ) s.files += %w( src/core/lib/http/parser.cc ) + s.files += %w( src/core/lib/iomgr/buffer_list.cc ) s.files += %w( src/core/lib/iomgr/call_combiner.cc ) s.files += %w( src/core/lib/iomgr/combiner.cc ) s.files += %w( src/core/lib/iomgr/endpoint.cc ) @@ -494,6 +497,7 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/gethostname_fallback.cc ) s.files += %w( src/core/lib/iomgr/gethostname_host_name_max.cc ) s.files += %w( src/core/lib/iomgr/gethostname_sysconf.cc ) + s.files += %w( src/core/lib/iomgr/internal_errqueue.cc ) s.files += %w( src/core/lib/iomgr/iocp_windows.cc ) s.files += %w( src/core/lib/iomgr/iomgr.cc ) s.files += %w( src/core/lib/iomgr/iomgr_custom.cc ) diff --git a/grpc.gyp b/grpc.gyp index a36998bcb3..654a531092 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -300,6 +300,7 @@ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', + 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -320,6 +321,7 @@ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', + 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', @@ -660,6 +662,7 @@ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', + 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -680,6 +683,7 @@ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', + 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', @@ -893,6 +897,7 @@ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', + 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -913,6 +918,7 @@ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', + 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', @@ -1104,6 +1110,7 @@ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', + 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -1124,6 +1131,7 @@ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', + 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', diff --git a/package.xml b/package.xml index 1bdf127c36..537c5404e7 100644 --- a/package.xml +++ b/package.xml @@ -335,6 +335,7 @@ + @@ -350,6 +351,7 @@ + @@ -479,6 +481,7 @@ + @@ -499,6 +502,7 @@ + diff --git a/src/core/ext/filters/client_channel/http_connect_handshaker.cc b/src/core/ext/filters/client_channel/http_connect_handshaker.cc index 4e8b8b71db..7ce8da8c00 100644 --- a/src/core/ext/filters/client_channel/http_connect_handshaker.cc +++ b/src/core/ext/filters/client_channel/http_connect_handshaker.cc @@ -320,7 +320,7 @@ static void http_connect_handshaker_do_handshake( // Take a new ref to be held by the write callback. gpr_ref(&handshaker->refcount); grpc_endpoint_write(args->endpoint, &handshaker->write_buffer, - &handshaker->request_done_closure); + &handshaker->request_done_closure, nullptr); gpr_mu_unlock(&handshaker->mu); } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index dfed824cd5..5bdcb387c9 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); grpc_endpoint* client = grpc_tcp_client_create_from_fd( - grpc_fd_create(fd, "client", false), args, "fd-client"); + grpc_fd_create(fd, "client", true), args, "fd-client"); grpc_transport* transport = grpc_create_chttp2_transport(final_args, client, true); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index a0228785ee..e4bd91d07b 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -44,7 +44,7 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, gpr_asprintf(&name, "fd:%d", fd); grpc_endpoint* server_endpoint = - grpc_tcp_create(grpc_fd_create(fd, name, false), + grpc_tcp_create(grpc_fd_create(fd, name, true), grpc_server_get_channel_args(server), name); gpr_free(name); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 36511fa608..027a57d606 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1029,7 +1029,8 @@ static void write_action(void* gt, grpc_error* error) { grpc_endpoint_write( t->ep, &t->outbuf, GRPC_CLOSURE_INIT(&t->write_action_end_locked, write_action_end_locked, t, - grpc_combiner_scheduler(t->combiner))); + grpc_combiner_scheduler(t->combiner)), + nullptr); } /* Callback from the grpc_endpoint after bytes have been written by calling diff --git a/src/core/lib/http/httpcli.cc b/src/core/lib/http/httpcli.cc index 12060074c5..3bd7a2ce59 100644 --- a/src/core/lib/http/httpcli.cc +++ b/src/core/lib/http/httpcli.cc @@ -163,7 +163,7 @@ static void done_write(void* arg, grpc_error* error) { static void start_write(internal_request* req) { grpc_slice_ref_internal(req->request_text); grpc_slice_buffer_add(&req->outgoing, req->request_text); - grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write); + grpc_endpoint_write(req->ep, &req->outgoing, &req->done_write, nullptr); } static void on_handshake_done(void* arg, grpc_endpoint* ep) { diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc new file mode 100644 index 0000000000..6ada23db1c --- /dev/null +++ b/src/core/lib/iomgr/buffer_list.cc @@ -0,0 +1,134 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/core/lib/iomgr/buffer_list.h" +#include "src/core/lib/iomgr/port.h" + +#include + +#ifdef GRPC_LINUX_ERRQUEUE +#include + +#include "src/core/lib/gprpp/memory.h" + +namespace grpc_core { +void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no, + void* arg) { + GPR_DEBUG_ASSERT(head != nullptr); + TracedBuffer* new_elem = New(seq_no, arg); + /* Store the current time as the sendmsg time. */ + new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME); + if (*head == nullptr) { + *head = new_elem; + return; + } + /* Append at the end. */ + TracedBuffer* ptr = *head; + while (ptr->next_ != nullptr) { + ptr = ptr->next_; + } + ptr->next_ = new_elem; +} + +namespace { +/** Fills gpr_timespec gts based on values from timespec ts */ +void fill_gpr_from_timestamp(gpr_timespec* gts, const struct timespec* ts) { + gts->tv_sec = ts->tv_sec; + gts->tv_nsec = static_cast(ts->tv_nsec); + gts->clock_type = GPR_CLOCK_REALTIME; +} + +/** The saved callback function that will be invoked when we get all the + * timestamps that we are going to get for a TracedBuffer. */ +void (*timestamps_callback)(void*, grpc_core::Timestamps*, + grpc_error* shutdown_err); +} /* namespace */ + +void TracedBuffer::ProcessTimestamp(TracedBuffer** head, + struct sock_extended_err* serr, + struct scm_timestamping* tss) { + GPR_DEBUG_ASSERT(head != nullptr); + TracedBuffer* elem = *head; + TracedBuffer* next = nullptr; + while (elem != nullptr) { + /* The byte number refers to the sequence number of the last byte which this + * timestamp relates to. */ + if (serr->ee_data >= elem->seq_no_) { + switch (serr->ee_info) { + case SCM_TSTAMP_SCHED: + fill_gpr_from_timestamp(&(elem->ts_.scheduled_time), &(tss->ts[0])); + elem = elem->next_; + break; + case SCM_TSTAMP_SND: + fill_gpr_from_timestamp(&(elem->ts_.sent_time), &(tss->ts[0])); + elem = elem->next_; + break; + case SCM_TSTAMP_ACK: + fill_gpr_from_timestamp(&(elem->ts_.acked_time), &(tss->ts[0])); + /* Got all timestamps. Do the callback and free this TracedBuffer. + * The thing below can be passed by value if we don't want the + * restriction on the lifetime. */ + timestamps_callback(elem->arg_, &(elem->ts_), GRPC_ERROR_NONE); + next = elem->next_; + Delete(elem); + *head = elem = next; + break; + default: + abort(); + } + } else { + break; + } + } +} + +void TracedBuffer::Shutdown(TracedBuffer** head, grpc_error* shutdown_err) { + GPR_DEBUG_ASSERT(head != nullptr); + TracedBuffer* elem = *head; + while (elem != nullptr) { + if (timestamps_callback) { + timestamps_callback(elem->arg_, &(elem->ts_), shutdown_err); + } + auto* next = elem->next_; + Delete(elem); + elem = next; + } + *head = nullptr; + GRPC_ERROR_UNREF(shutdown_err); +} + +void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*, + grpc_core::Timestamps*, + grpc_error* error)) { + timestamps_callback = fn; +} +} /* namespace grpc_core */ + +#else /* GRPC_LINUX_ERRQUEUE */ + +namespace grpc_core { +void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*, + grpc_core::Timestamps*, + grpc_error* error)) { + gpr_log(GPR_DEBUG, "Timestamps callback is not enabled for this platform"); +} +} /* namespace grpc_core */ + +#endif /* GRPC_LINUX_ERRQUEUE */ diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h new file mode 100644 index 0000000000..cbbf50a657 --- /dev/null +++ b/src/core/lib/iomgr/buffer_list.h @@ -0,0 +1,96 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H +#define GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H + +#include + +#include "src/core/lib/iomgr/port.h" + +#include + +#include "src/core/lib/gprpp/memory.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/internal_errqueue.h" + +namespace grpc_core { +struct Timestamps { + /* TODO(yashykt): This would also need to store OPTSTAT once support is added + */ + gpr_timespec sendmsg_time; + gpr_timespec scheduled_time; + gpr_timespec sent_time; + gpr_timespec acked_time; +}; + +/** TracedBuffer is a class to keep track of timestamps for a specific buffer in + * the TCP layer. We are only tracking timestamps for Linux kernels and hence + * this class would only be used by Linux platforms. For all other platforms, + * TracedBuffer would be an empty class. + * + * The timestamps collected are according to grpc_core::Timestamps declared + * above. + * + * A TracedBuffer list is kept track of using the head element of the list. If + * the head element of the list is nullptr, then the list is empty. + */ +#ifdef GRPC_LINUX_ERRQUEUE +class TracedBuffer { + public: + /** Add a new entry in the TracedBuffer list pointed to by head. Also saves + * sendmsg_time with the current timestamp. */ + static void AddNewEntry(grpc_core::TracedBuffer** head, uint32_t seq_no, + void* arg); + + /** Processes a received timestamp based on sock_extended_err and + * scm_timestamping structures. It will invoke the timestamps callback if the + * timestamp type is SCM_TSTAMP_ACK. */ + static void ProcessTimestamp(grpc_core::TracedBuffer** head, + struct sock_extended_err* serr, + struct scm_timestamping* tss); + + /** Cleans the list by calling the callback for each traced buffer in the list + * with timestamps that it has. */ + static void Shutdown(grpc_core::TracedBuffer** head, + grpc_error* shutdown_err); + + private: + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW + + TracedBuffer(int seq_no, void* arg) + : seq_no_(seq_no), arg_(arg), next_(nullptr) {} + + uint32_t seq_no_; /* The sequence number for the last byte in the buffer */ + void* arg_; /* The arg to pass to timestamps_callback */ + grpc_core::Timestamps ts_; /* The timestamps corresponding to this buffer */ + grpc_core::TracedBuffer* next_; /* The next TracedBuffer in the list */ +}; +#else /* GRPC_LINUX_ERRQUEUE */ +class TracedBuffer {}; +#endif /* GRPC_LINUX_ERRQUEUE */ + +/** Sets the callback function to call when timestamps for a write are + * collected. The callback does not own a reference to error. */ +void grpc_tcp_set_write_timestamps_callback(void (*fn)(void*, + grpc_core::Timestamps*, + grpc_error* error)); + +}; /* namespace grpc_core */ + +#endif /* GRPC_CORE_LIB_IOMGR_BUFFER_LIST_H */ diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 92e7930111..44fb47e19d 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -28,8 +28,8 @@ void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices, } void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { - ep->vtable->write(ep, slices, cb); + grpc_closure* cb, void* arg) { + ep->vtable->write(ep, slices, cb, arg); } void grpc_endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) { diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 15db1649fa..1f590a80ca 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -33,10 +33,12 @@ typedef struct grpc_endpoint grpc_endpoint; typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; +class Timestamps; struct grpc_endpoint_vtable { void (*read)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb); - void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb); + void (*write)(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, + void* arg); void (*add_to_pollset)(grpc_endpoint* ep, grpc_pollset* pollset); void (*add_to_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset); void (*delete_from_pollset_set)(grpc_endpoint* ep, grpc_pollset_set* pollset); @@ -70,9 +72,11 @@ int grpc_endpoint_get_fd(grpc_endpoint* ep); \a slices may be mutated at will by the endpoint until cb is called. No guarantee is made to the content of slices after a write EXCEPT that it is a valid slice buffer. + \a arg is platform specific. It is currently only used by TCP on linux + platforms as an argument that would be forwarded to the timestamps callback. */ void grpc_endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb); + grpc_closure* cb, void* arg); /* Causes any pending and future read/write callbacks to run immediately with success==0 */ diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc index c3bc0cc8fd..df2cf508c8 100644 --- a/src/core/lib/iomgr/endpoint_cfstream.cc +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -268,7 +268,7 @@ static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, } static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu", diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 5c5c246f99..3afbfd7254 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, grpc_core::ExecCtx exec_ctx; gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args, + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, true), args, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args, + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, true), args, "socketpair-client"); gpr_free(final_name); diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 0205363d5c..d4377e2d50 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -237,14 +237,19 @@ void grpc_event_engine_shutdown(void) { } bool grpc_event_engine_can_track_errors(void) { +/* Only track errors if platform supports errqueue. */ +#ifdef GRPC_LINUX_ERRQUEUE return g_event_engine->can_track_err; +#else + return false; +#endif /* GRPC_LINUX_ERRQUEUE */ } grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); - GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err); - return g_event_engine->fd_create(fd, name, track_err); + return g_event_engine->fd_create(fd, name, + track_err && g_event_engine->can_track_err); } int grpc_fd_wrapped_fd(grpc_fd* fd) { diff --git a/src/core/lib/iomgr/internal_errqueue.cc b/src/core/lib/iomgr/internal_errqueue.cc new file mode 100644 index 0000000000..8823737e49 --- /dev/null +++ b/src/core/lib/iomgr/internal_errqueue.cc @@ -0,0 +1,40 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +#include "src/core/lib/iomgr/port.h" + +#include "src/core/lib/iomgr/internal_errqueue.h" + +#ifdef GRPC_POSIX_SOCKET_TCP + +#ifdef GPR_LINUX +#include +#endif /* GPR_LINUX */ + +bool kernel_supports_errqueue() { +#ifdef LINUX_VERSION_CODE +#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) + return true; +#endif /* LINUX_VERSION_CODE <= KERNEL_VERSION(4, 0, 0) */ +#endif /* LINUX_VERSION_CODE */ + return false; +} + +#endif /* GRPC_POSIX_SOCKET_TCP */ diff --git a/src/core/lib/iomgr/internal_errqueue.h b/src/core/lib/iomgr/internal_errqueue.h new file mode 100644 index 0000000000..fc11be9a6d --- /dev/null +++ b/src/core/lib/iomgr/internal_errqueue.h @@ -0,0 +1,62 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* This file contains constants defined in and + * so as to allow collecting network timestamps in the + * kernel. This file allows tcp_posix.cc to compile on platforms that do not + * have and . + */ + +#ifndef GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H +#define GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H + +#include + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_POSIX_SOCKET_TCP + +#include +#include + +#ifdef GRPC_LINUX_ERRQUEUE +#include +#include +#include +#endif /* GRPC_LINUX_ERRQUEUE */ + +namespace grpc_core { + +#ifdef GRPC_LINUX_ERRQUEUE +constexpr uint32_t kTimestampingSocketOptions = SOF_TIMESTAMPING_SOFTWARE | + SOF_TIMESTAMPING_OPT_ID | + SOF_TIMESTAMPING_OPT_TSONLY; +constexpr uint32_t kTimestampingRecordingOptions = + SOF_TIMESTAMPING_TX_SCHED | SOF_TIMESTAMPING_TX_SOFTWARE | + SOF_TIMESTAMPING_TX_ACK; +#endif /* GRPC_LINUX_ERRQUEUE */ + +/* Returns true if kernel is capable of supporting errqueue and timestamping. + * Currently allowing only linux kernels above 4.0.0 + */ +bool kernel_supports_errqueue(); +} // namespace grpc_core + +#endif /* GRPC_POSIX_SOCKET_TCP */ + +#endif /* GRPC_CORE_LIB_IOMGR_INTERNAL_ERRQUEUE_H */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 066417b93c..a4688fd0ef 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -60,6 +60,12 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 +#include +#ifdef LINUX_VERSION_CODE +#if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) +#define GRPC_LINUX_ERRQUEUE 1 +#endif /* LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) */ +#endif /* LINUX_VERSION_CODE */ #define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 #define GRPC_POSIX_FORK 1 #define GRPC_POSIX_HOST_NAME_MAX 1 diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 296ee74311..9c989b7dfe 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -279,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args, } addr_str = grpc_sockaddr_to_uri(mapped_addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); - *fdobj = grpc_fd_create(fd, name, false); + *fdobj = grpc_fd_create(fd, name, true); gpr_free(name); gpr_free(addr_str); return GRPC_ERROR_NONE; diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index 990e8d632b..e02a1898f2 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -221,7 +221,7 @@ static void custom_write_callback(grpc_custom_socket* socket, } static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep; GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD(); diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index b53ffbf01c..1db2790265 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -27,7 +27,9 @@ #include #include +#include #include +#include #include #include #include @@ -46,6 +48,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" @@ -97,17 +100,42 @@ struct grpc_tcp { grpc_closure read_done_closure; grpc_closure write_done_closure; + grpc_closure error_closure; char* peer_string; grpc_resource_user* resource_user; grpc_resource_user_slice_allocator slice_allocator; + + grpc_core::TracedBuffer* tb_head; /* List of traced buffers */ + gpr_mu tb_mu; /* Lock for access to list of traced buffers */ + + /* grpc_endpoint_write takes an argument which if non-null means that the + * transport layer wants the TCP layer to collect timestamps for this write. + * This arg is forwarded to the timestamps callback function when the ACK + * timestamp is received from the kernel. This arg is a (void *) which allows + * users of this API to pass in a pointer to any kind of structure. This + * structure could actually be a tag or any book-keeping object that the user + * can use to distinguish between different traced writes. The only + * requirement from the TCP endpoint layer is that this arg should be non-null + * if the user wants timestamps for the write. */ + void* outgoing_buffer_arg; + /* A counter which starts at 0. It is initialized the first time the socket + * options for collecting timestamps are set, and is incremented with each + * byte sent. */ + int bytes_counter; + bool socket_ts_enabled; /* True if timestamping options are set on the socket + */ + gpr_atm + stop_error_notification; /* Set to 1 if we do not want to be notified on + errors anymore */ }; struct backup_poller { gpr_mu* pollset_mu; grpc_closure run_poller; }; + } // namespace #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1)) @@ -302,6 +330,7 @@ static void tcp_free(grpc_tcp* tcp) { grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_resource_user_unref(tcp->resource_user); gpr_free(tcp->peer_string); + gpr_mu_destroy(&tcp->tb_mu); gpr_free(tcp); } @@ -347,6 +376,10 @@ static void tcp_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = reinterpret_cast(ep); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + if (grpc_event_engine_can_track_errors()) { + gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); + grpc_fd_set_error(tcp->em_fd); + } TCP_UNREF(tcp, "destroy"); } @@ -513,6 +546,234 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, } } +/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number + * of bytes sent. */ +ssize_t tcp_send(int fd, const struct msghdr* msg) { + GPR_TIMER_SCOPE("sendmsg", 1); + ssize_t sent_length; + do { + /* TODO(klempner): Cork if this is a partial write */ + GRPC_STATS_INC_SYSCALL_WRITE(); + sent_length = sendmsg(fd, msg, SENDMSG_FLAGS); + } while (sent_length < 0 && errno == EINTR); + return sent_length; +} + +/** This is to be called if outgoing_buffer_arg is not null. On linux platforms, + * this will call sendmsg with socket options set to collect timestamps inside + * the kernel. On return, sent_length is set to the return value of the sendmsg + * call. Returns false if setting the socket options failed. This is not + * implemented for non-linux platforms currently, and crashes out. + */ +static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, + size_t sending_length, + ssize_t* sent_length, grpc_error** error); + +/** The callback function to be invoked when we get an error on the socket. */ +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); + +#ifdef GRPC_LINUX_ERRQUEUE +static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, + size_t sending_length, + ssize_t* sent_length, + grpc_error** error) { + if (!tcp->socket_ts_enabled) { + uint32_t opt = grpc_core::kTimestampingSocketOptions; + if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, + static_cast(&opt), sizeof(opt)) != 0) { + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp); + grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket."); + } + return false; + } + tcp->bytes_counter = -1; + tcp->socket_ts_enabled = true; + } + /* Set control message to indicate that you want timestamps. */ + union { + char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))]; + struct cmsghdr align; + } u; + cmsghdr* cmsg = reinterpret_cast(u.cmsg_buf); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SO_TIMESTAMPING; + cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t)); + *reinterpret_cast(CMSG_DATA(cmsg)) = + grpc_core::kTimestampingRecordingOptions; + msg->msg_control = u.cmsg_buf; + msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t)); + + /* If there was an error on sendmsg the logic in tcp_flush will handle it. */ + ssize_t length = tcp_send(tcp->fd, msg); + *sent_length = length; + /* Only save timestamps if all the bytes were taken by sendmsg. */ + if (sending_length == static_cast(length)) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::AddNewEntry( + &tcp->tb_head, static_cast(tcp->bytes_counter + length), + tcp->outgoing_buffer_arg); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; + } + return true; +} + +/** Reads \a cmsg to derive timestamps from the control messages. If a valid + * timestamp is found, the traced buffer list is updated with this timestamp. + * The caller of this function should be looping on the control messages found + * in \a msg. \a cmsg should point to the control message that the caller wants + * processed. + * On return, a pointer to a control message is returned. On the next iteration, + * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */ +struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, + struct cmsghdr* cmsg) { + auto next_cmsg = CMSG_NXTHDR(msg, cmsg); + if (next_cmsg == nullptr) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Received timestamp without extended error"); + } + return cmsg; + } + + if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) || + !(next_cmsg->cmsg_type == IP_RECVERR || + next_cmsg->cmsg_type == IPV6_RECVERR)) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Unexpected control message"); + } + return cmsg; + } + + auto tss = reinterpret_cast(CMSG_DATA(cmsg)); + auto serr = reinterpret_cast(CMSG_DATA(next_cmsg)); + if (serr->ee_errno != ENOMSG || + serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) { + gpr_log(GPR_ERROR, "Unexpected control message"); + return cmsg; + } + /* The error handling can potentially be done on another thread so we need + * to protect the traced buffer list. A lock free list might be better. Using + * a simple mutex for now. */ + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, tss); + gpr_mu_unlock(&tcp->tb_mu); + return next_cmsg; +} + +/** For linux platforms, reads the socket's error queue and processes error + * messages from the queue. Returns true if all the errors processed were + * timestamps. Returns false if any of the errors were not timestamps. For + * non-linux platforms, error processing is not used/enabled currently. + */ +static bool process_errors(grpc_tcp* tcp) { + while (true) { + struct iovec iov; + iov.iov_base = nullptr; + iov.iov_len = 0; + struct msghdr msg; + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 0; + msg.msg_flags = 0; + + union { + char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) + + CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/]; + struct cmsghdr align; + } aligned_buf; + memset(&aligned_buf, 0, sizeof(aligned_buf)); + + msg.msg_control = aligned_buf.rbuf; + msg.msg_controllen = sizeof(aligned_buf.rbuf); + + int r, saved_errno; + do { + r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE); + saved_errno = errno; + } while (r < 0 && saved_errno == EINTR); + + if (r == -1 && saved_errno == EAGAIN) { + return true; /* No more errors to process */ + } + if (r == -1) { + return false; + } + if (grpc_tcp_trace.enabled()) { + if ((msg.msg_flags & MSG_CTRUNC) == 1) { + gpr_log(GPR_INFO, "Error message was truncated."); + } + } + + if (msg.msg_controllen == 0) { + /* There was no control message found. It was probably spurious. */ + return true; + } + for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; + cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level != SOL_SOCKET || + cmsg->cmsg_type != SCM_TIMESTAMPING) { + /* Got a control message that is not a timestamp. Don't know how to + * handle this. */ + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, + "unknown control message cmsg_level:%d cmsg_type:%d", + cmsg->cmsg_level, cmsg->cmsg_type); + } + return false; + } + process_timestamp(tcp, &msg, cmsg); + } + } +} + +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { + grpc_tcp* tcp = static_cast(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error)); + } + + if (error != GRPC_ERROR_NONE || + static_cast(gpr_atm_acq_load(&tcp->stop_error_notification))) { + /* We aren't going to register to hear on error anymore, so it is safe to + * unref. */ + grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error)); + TCP_UNREF(tcp, "error-tracking"); + return; + } + + /* We are still interested in collecting timestamps, so let's try reading + * them. */ + if (!process_errors(tcp)) { + /* This was not a timestamps error. This was an actual error. Set the + * read and write closures to be ready. + */ + grpc_fd_set_readable(tcp->em_fd); + grpc_fd_set_writable(tcp->em_fd); + } + GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); +} + +#else /* GRPC_LINUX_ERRQUEUE */ +static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, + size_t sending_length, + ssize_t* sent_length, + grpc_error** error) { + gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); + GPR_ASSERT(0); + return false; +} + +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { + gpr_log(GPR_ERROR, "Error handling is not supported for this platform"); + GPR_ASSERT(0); +} +#endif /* GRPC_LINUX_ERRQUEUE */ + /* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX @@ -557,19 +818,20 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = iov_size; - msg.msg_control = nullptr; - msg.msg_controllen = 0; msg.msg_flags = 0; + if (tcp->outgoing_buffer_arg != nullptr) { + if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, + error)) + return true; /* something went wrong with timestamps */ + } else { + msg.msg_control = nullptr; + msg.msg_controllen = 0; - GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); - GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); + GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); + GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); - GPR_TIMER_SCOPE("sendmsg", 1); - do { - /* TODO(klempner): Cork if this is a partial write */ - GRPC_STATS_INC_SYSCALL_WRITE(); - sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); - } while (sent_length < 0 && errno == EINTR); + sent_length = tcp_send(tcp->fd, &msg); + } if (sent_length < 0) { if (errno == EAGAIN) { @@ -593,6 +855,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { } GPR_ASSERT(tcp->outgoing_byte_idx == 0); + tcp->bytes_counter += sent_length; trailing = sending_length - static_cast(sent_length); while (trailing > 0) { size_t slice_length; @@ -607,7 +870,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { trailing -= slice_length; } } - if (outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); @@ -640,14 +902,13 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } } static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { GPR_TIMER_SCOPE("tcp_write", 0); grpc_tcp* tcp = reinterpret_cast(ep); grpc_error* error = GRPC_ERROR_NONE; @@ -675,6 +936,10 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, } tcp->outgoing_buffer = buf; tcp->outgoing_byte_idx = 0; + tcp->outgoing_buffer_arg = arg; + if (arg) { + GPR_ASSERT(grpc_event_engine_can_track_errors()); + } if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); @@ -792,6 +1057,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->bytes_read_this_round = 0; /* Will be set to false by the very first endpoint read function */ tcp->is_first_read = true; + tcp->bytes_counter = -1; + tcp->socket_ts_enabled = false; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -803,6 +1070,19 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); grpc_resource_quota_unref_internal(resource_quota); + gpr_mu_init(&tcp->tb_mu); + tcp->tb_head = nullptr; + /* Start being notified on errors if event engine can track errors. */ + if (grpc_event_engine_can_track_errors()) { + /* Grab a ref to tcp so that we can safely access the tcp struct when + * processing errors. We unref when we no longer want to track errors + * separately. */ + TCP_REF(tcp, "error-tracking"); + gpr_atm_rel_store(&tcp->stop_error_notification, 0); + GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); + } return &tcp->base; } @@ -821,6 +1101,11 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, tcp->release_fd = fd; tcp->release_fd_cb = done; grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + if (grpc_event_engine_can_track_errors()) { + /* Stop errors notification. */ + gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); + grpc_fd_set_error(tcp->em_fd); + } TCP_UNREF(tcp, "destroy"); } diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index af89bd24db..eff825cb92 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -31,7 +31,10 @@ #include +#include "src/core/lib/iomgr/port.h" + #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 8ddf684fea..824db07fbf 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) { gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str); } - grpc_fd* fdobj = grpc_fd_create(fd, name, false); + grpc_fd* fdobj = grpc_fd_create(fd, name, true); read_notifier_pollset = sp->server->pollsets[static_cast(gpr_atm_no_barrier_fetch_add( @@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) { listener->sibling = sp; sp->server = listener->server; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name, false); + sp->emfd = grpc_fd_create(fd, name, true); memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = listener->port_index; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index b9f8145572..9595c028ce 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd, s->tail = sp; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name, false); + sp->emfd = grpc_fd_create(fd, name, true); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = port_index; diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index b3cb442f18..64c4a56ae9 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -296,7 +296,7 @@ static void on_write(void* tcpp, grpc_error* error) { /* Initiates a write. */ static void win_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { grpc_tcp* tcp = (grpc_tcp*)ep; grpc_winsocket* socket = tcp->socket; grpc_winsocket_callback_info* info = &socket->write_info; diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index bdb2d0e764..3dd7cab855 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_free(addr_str); - emfd_ = grpc_fd_create(fd, name, false); + emfd_ = grpc_fd_create(fd, name, true); memcpy(&addr_, addr, sizeof(grpc_resolved_address)); GPR_ASSERT(emfd_); gpr_free(name); diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index 840b2e73bc..f40f969bb7 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -254,7 +254,7 @@ static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur, } static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { GPR_TIMER_SCOPE("secure_endpoint.endpoint_write", 0); unsigned i; @@ -342,7 +342,7 @@ static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices, return; } - grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb); + grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg); } static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error* why) { diff --git a/src/core/lib/security/transport/security_handshaker.cc b/src/core/lib/security/transport/security_handshaker.cc index aff723ed04..d76d582638 100644 --- a/src/core/lib/security/transport/security_handshaker.cc +++ b/src/core/lib/security/transport/security_handshaker.cc @@ -259,7 +259,7 @@ static grpc_error* on_handshake_next_done_locked( grpc_slice_buffer_reset_and_unref_internal(&h->outgoing); grpc_slice_buffer_add(&h->outgoing, to_send); grpc_endpoint_write(h->args->endpoint, &h->outgoing, - &h->on_handshake_data_sent_to_peer); + &h->on_handshake_data_sent_to_peer, nullptr); } else if (handshaker_result == nullptr) { // There is nothing to send, but need to read from peer. grpc_endpoint_read(h->args->endpoint, h->args->read_buffer, diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 6e6d756eec..0f68e823d7 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -82,6 +82,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/http/format_request.cc', 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', + 'src/core/lib/iomgr/buffer_list.cc', 'src/core/lib/iomgr/call_combiner.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', @@ -102,6 +103,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/gethostname_fallback.cc', 'src/core/lib/iomgr/gethostname_host_name_max.cc', 'src/core/lib/iomgr/gethostname_sysconf.cc', + 'src/core/lib/iomgr/internal_errqueue.cc', 'src/core/lib/iomgr/iocp_windows.cc', 'src/core/lib/iomgr/iomgr.cc', 'src/core/lib/iomgr/iomgr_custom.cc', diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc index c03ebcf409..ade23133c5 100644 --- a/test/core/bad_client/bad_client.cc +++ b/test/core/bad_client/bad_client.cc @@ -115,7 +115,7 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags, grpc_schedule_on_exec_ctx); /* Write data */ - grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure); + grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure, nullptr); grpc_core::ExecCtx::Get()->Flush(); /* Await completion, unless the request is large and write may not finish diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc index 3d133cfc18..f7396a1684 100644 --- a/test/core/end2end/bad_server_response_test.cc +++ b/test/core/end2end/bad_server_response_test.cc @@ -104,7 +104,7 @@ static void handle_write() { grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer); grpc_slice_buffer_add(&state.outgoing_buffer, slice); - grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write); + grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr); } static void handle_read(void* arg, grpc_error* error) { diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index f02fa9d998..ea9c000efb 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -201,7 +201,7 @@ static void on_client_write_done(void* arg, grpc_error* error) { &conn->client_write_buffer); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done); + &conn->on_client_write_done, nullptr); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "write_done"); @@ -226,7 +226,7 @@ static void on_server_write_done(void* arg, grpc_error* error) { &conn->server_write_buffer); conn->server_is_writing = true; grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done); + &conn->on_server_write_done, nullptr); } else { // No more writes. Unref the connection. proxy_connection_unref(conn, "server_write"); @@ -257,7 +257,7 @@ static void on_client_read_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "client_read"); conn->server_is_writing = true; grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer, - &conn->on_server_write_done); + &conn->on_server_write_done, nullptr); } // Read more data. grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer, @@ -288,7 +288,7 @@ static void on_server_read_done(void* arg, grpc_error* error) { proxy_connection_ref(conn, "server_read"); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_client_write_done); + &conn->on_client_write_done, nullptr); } // Read more data. grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer, @@ -340,7 +340,7 @@ static void on_server_connect_done(void* arg, grpc_error* error) { grpc_slice_buffer_add(&conn->client_write_buffer, slice); conn->client_is_writing = true; grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer, - &conn->on_write_response_done); + &conn->on_write_response_done, nullptr); } /** diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD index 002671a5fa..675d9e6278 100644 --- a/test/core/iomgr/BUILD +++ b/test/core/iomgr/BUILD @@ -246,6 +246,19 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "buffer_list_test", + srcs = ["buffer_list_test.cc"], + language = "C++", + deps = [ + "//:gpr", + "//:grpc", + "//test/core/util:gpr_test_util", + "//test/core/util:grpc_test_util", + ], +) + + grpc_cc_test( name = "tcp_server_posix_test", srcs = ["tcp_server_posix_test.cc"], diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc new file mode 100644 index 0000000000..f1773580bd --- /dev/null +++ b/test/core/iomgr/buffer_list_test.cc @@ -0,0 +1,111 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#include "src/core/lib/iomgr/buffer_list.h" + +#include + +#include "test/core/util/test_config.h" + +#ifdef GRPC_LINUX_ERRQUEUE + +static void TestShutdownFlushesListVerifier(void* arg, + grpc_core::Timestamps* ts, + grpc_error* error) { + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(arg != nullptr); + gpr_atm* done = reinterpret_cast(arg); + gpr_atm_rel_store(done, static_cast(1)); +} + +/** Tests that all TracedBuffer elements in the list are flushed out on + * shutdown. + * Also tests that arg is passed correctly. + */ +static void TestShutdownFlushesList() { + grpc_core::grpc_tcp_set_write_timestamps_callback( + TestShutdownFlushesListVerifier); + grpc_core::TracedBuffer* list = nullptr; +#define NUM_ELEM 5 + gpr_atm verifier_called[NUM_ELEM]; + for (auto i = 0; i < NUM_ELEM; i++) { + gpr_atm_rel_store(&verifier_called[i], static_cast(0)); + grpc_core::TracedBuffer::AddNewEntry( + &list, i, static_cast(&verifier_called[i])); + } + grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE); + GPR_ASSERT(list == nullptr); + for (auto i = 0; i < NUM_ELEM; i++) { + GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) == + static_cast(1)); + } +} + +static void TestVerifierCalledOnAckVerifier(void* arg, + grpc_core::Timestamps* ts, + grpc_error* error) { + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(arg != nullptr); + GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME); + GPR_ASSERT(ts->acked_time.tv_sec == 123); + GPR_ASSERT(ts->acked_time.tv_nsec == 456); + gpr_atm* done = reinterpret_cast(arg); + gpr_atm_rel_store(done, static_cast(1)); +} + +/** Tests that the timestamp verifier is called on an ACK timestamp. + */ +static void TestVerifierCalledOnAck() { + struct sock_extended_err serr; + serr.ee_data = 213; + serr.ee_info = SCM_TSTAMP_ACK; + struct scm_timestamping tss; + tss.ts[0].tv_sec = 123; + tss.ts[0].tv_nsec = 456; + grpc_core::grpc_tcp_set_write_timestamps_callback( + TestVerifierCalledOnAckVerifier); + grpc_core::TracedBuffer* list = nullptr; + gpr_atm verifier_called; + gpr_atm_rel_store(&verifier_called, static_cast(0)); + grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called); + grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss); + GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast(1)); + GPR_ASSERT(list == nullptr); + grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE); +} + +static void TestTcpBufferList() { + TestVerifierCalledOnAck(); + TestShutdownFlushesList(); +} + +int main(int argc, char** argv) { + grpc_test_init(argc, argv); + grpc_init(); + TestTcpBufferList(); + grpc_shutdown(); + return 0; +} + +#else /* GRPC_LINUX_ERRQUEUE */ + +int main(int argc, char** argv) { return 0; } + +#endif /* GRPC_LINUX_ERRQUEUE */ diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc index 8db8ac5ed6..a9e8ba86c5 100644 --- a/test/core/iomgr/endpoint_tests.cc +++ b/test/core/iomgr/endpoint_tests.cc @@ -150,8 +150,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) { &state->current_write_data); grpc_slice_buffer_reset_and_unref(&state->outgoing); grpc_slice_buffer_addn(&state->outgoing, slices, nslices); - grpc_endpoint_write(state->write_ep, &state->outgoing, - &state->done_write); + grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write, + nullptr); gpr_free(slices); return; } @@ -294,7 +294,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) { grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a")); grpc_endpoint_write(f.client_ep, &slice_buffer, GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count, - grpc_schedule_on_exec_ctx)); + grpc_schedule_on_exec_ctx), + nullptr); wait_for_fail_count(&fail_count, 3); grpc_endpoint_shutdown(f.client_ep, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown")); diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index 3e87831e44..6447cc234d 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -36,6 +36,9 @@ #include #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/buffer_list.h" +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/sockaddr_posix.h" #include "src/core/lib/slice/slice_internal.h" #include "test/core/iomgr/endpoint_tests.h" #include "test/core/util/test_config.h" @@ -68,6 +71,43 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); } +static void create_inet_sockets(int sv[2]) { + /* Prepare listening socket */ + struct sockaddr_in addr; + memset(&addr, 0, sizeof(struct sockaddr_in)); + addr.sin_family = AF_INET; + int sock = socket(AF_INET, SOCK_STREAM, 0); + GPR_ASSERT(sock); + GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0); + listen(sock, 1); + + /* Prepare client socket and connect to server */ + socklen_t len = sizeof(sockaddr_in); + GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0); + + int client = socket(AF_INET, SOCK_STREAM, 0); + GPR_ASSERT(client); + int ret; + do { + ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in)); + } while (ret == -1 && errno == EINTR); + + /* Accept client connection */ + len = sizeof(socklen_t); + int server; + do { + server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len); + } while (server == -1 && errno == EINTR); + GPR_ASSERT(server != -1); + + sv[0] = server; + sv[1] = client; + int flags = fcntl(sv[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); +} + static ssize_t fill_socket(int fd) { ssize_t write_bytes; ssize_t total_bytes = 0; @@ -289,11 +329,10 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size, static void write_done(void* user_data /* write_socket_state */, grpc_error* error) { + GPR_ASSERT(error == GRPC_ERROR_NONE); struct write_socket_state* state = static_cast(user_data); - gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(g_mu); - gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; GPR_ASSERT( GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); @@ -340,10 +379,24 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { gpr_free(buf); } +/* Verifier for timestamps callback for write_test */ +void timestamps_verifier(void* arg, grpc_core::Timestamps* ts, + grpc_error* error) { + GPR_ASSERT(error == GRPC_ERROR_NONE); + GPR_ASSERT(arg != nullptr); + GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME); + GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME); + GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME); + gpr_atm* done_timestamps = (gpr_atm*)arg; + gpr_atm_rel_store(done_timestamps, static_cast(1)); +} + /* Write to a socket using the grpc_tcp API, then drain it directly. Note that if the write does not complete immediately we need to drain the - socket in parallel with the read. */ -static void write_test(size_t num_bytes, size_t slice_size) { + socket in parallel with the read. If collect_timestamps is true, it will + try to get timestamps for the write. */ +static void write_test(size_t num_bytes, size_t slice_size, + bool collect_timestamps) { int sv[2]; grpc_endpoint* ep; struct write_socket_state state; @@ -356,19 +409,27 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); grpc_core::ExecCtx exec_ctx; + if (collect_timestamps && !grpc_event_engine_can_track_errors()) { + return; + } + gpr_log(GPR_INFO, "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR, num_bytes, slice_size); - create_sockets(sv); + if (collect_timestamps) { + create_inet_sockets(sv); + } else { + create_sockets(sv); + } grpc_arg a[1]; a[0].key = const_cast(GRPC_ARG_TCP_READ_CHUNK_SIZE); a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = static_cast(slice_size); grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args, - "test"); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps), + &args, "test"); grpc_endpoint_add_to_pollset(ep, g_pollset); state.ep = ep; @@ -381,18 +442,26 @@ static void write_test(size_t num_bytes, size_t slice_size) { GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_write(ep, &outgoing, &write_done_closure); + gpr_atm done_timestamps; + gpr_atm_rel_store(&done_timestamps, static_cast(0)); + grpc_endpoint_write(ep, &outgoing, &write_done_closure, + grpc_event_engine_can_track_errors() && collect_timestamps + ? (void*)&done_timestamps + : nullptr); drain_socket_blocking(sv[0], num_bytes, num_bytes); + exec_ctx.Flush(); gpr_mu_lock(g_mu); for (;;) { grpc_pollset_worker* worker = nullptr; - if (state.write_done) { + if (state.write_done && + (!(grpc_event_engine_can_track_errors() && collect_timestamps) || + gpr_atm_acq_load(&done_timestamps) == static_cast(1))) { break; } GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - + exec_ctx.Flush(); gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); @@ -497,14 +566,21 @@ void run_tests(void) { large_read_test(8192); large_read_test(1); - write_test(100, 8192); - write_test(100, 1); - write_test(100000, 8192); - write_test(100000, 1); - write_test(100000, 137); + write_test(100, 8192, false); + write_test(100, 1, false); + write_test(100000, 8192, false); + write_test(100000, 1, false); + write_test(100000, 137, false); + + write_test(100, 8192, true); + write_test(100, 1, true); + write_test(100000, 8192, true); + write_test(100000, 1, true); + write_test(100, 137, true); for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { - write_test(40320, i); + write_test(40320, i, false); + write_test(40320, i, true); } release_fd_test(100, 8192); @@ -549,6 +625,7 @@ int main(int argc, char** argv) { grpc_closure destroyed; grpc_test_init(argc, argv); grpc_init(); + grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier); { grpc_core::ExecCtx exec_ctx; g_pollset = static_cast(gpr_zalloc(grpc_pollset_size())); diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc index 1156cd5fc5..ef6fd62b51 100644 --- a/test/core/util/mock_endpoint.cc +++ b/test/core/util/mock_endpoint.cc @@ -55,7 +55,7 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices, } static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { mock_endpoint* m = reinterpret_cast(ep); for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc index 5958216747..3cc8ad6fe1 100644 --- a/test/core/util/passthru_endpoint.cc +++ b/test/core/util/passthru_endpoint.cc @@ -76,7 +76,7 @@ static half* other_half(half* h) { } static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { half* m = other_half(reinterpret_cast(ep)); gpr_mu_lock(&m->parent->mu); grpc_error* error = GRPC_ERROR_NONE; diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc index f2efb049b4..62ed72a629 100644 --- a/test/core/util/trickle_endpoint.cc +++ b/test/core/util/trickle_endpoint.cc @@ -62,7 +62,7 @@ static void maybe_call_write_cb_locked(trickle_endpoint* te) { } static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { trickle_endpoint* te = reinterpret_cast(ep); gpr_mu_lock(&te->mu); GPR_ASSERT(te->write_cb == nullptr); @@ -186,7 +186,8 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) { te->last_write = now; grpc_endpoint_write( te->wrapped, &te->writing_buffer, - GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx)); + GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx), + nullptr); maybe_call_write_cb_locked(te); } } diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc index 1e9bd273aa..189923a841 100644 --- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc +++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc @@ -96,7 +96,7 @@ class DummyEndpoint : public grpc_endpoint { } static void write(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE); } diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 43ebf8cad9..e6d4b1d98f 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1067,6 +1067,7 @@ src/core/lib/http/format_request.h \ src/core/lib/http/httpcli.h \ src/core/lib/http/parser.h \ src/core/lib/iomgr/block_annotate.h \ +src/core/lib/iomgr/buffer_list.h \ src/core/lib/iomgr/call_combiner.h \ src/core/lib/iomgr/closure.h \ src/core/lib/iomgr/combiner.h \ @@ -1082,6 +1083,7 @@ src/core/lib/iomgr/ev_posix.h \ src/core/lib/iomgr/exec_ctx.h \ src/core/lib/iomgr/executor.h \ src/core/lib/iomgr/gethostname.h \ +src/core/lib/iomgr/internal_errqueue.h \ src/core/lib/iomgr/iocp_windows.h \ src/core/lib/iomgr/iomgr.h \ src/core/lib/iomgr/iomgr_custom.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index c1706fd070..7cd1dc7bf3 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1159,6 +1159,8 @@ src/core/lib/http/parser.cc \ src/core/lib/http/parser.h \ src/core/lib/iomgr/README.md \ src/core/lib/iomgr/block_annotate.h \ +src/core/lib/iomgr/buffer_list.cc \ +src/core/lib/iomgr/buffer_list.h \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/call_combiner.h \ src/core/lib/iomgr/closure.h \ @@ -1194,6 +1196,8 @@ src/core/lib/iomgr/gethostname.h \ src/core/lib/iomgr/gethostname_fallback.cc \ src/core/lib/iomgr/gethostname_host_name_max.cc \ src/core/lib/iomgr/gethostname_sysconf.cc \ +src/core/lib/iomgr/internal_errqueue.cc \ +src/core/lib/iomgr/internal_errqueue.h \ src/core/lib/iomgr/iocp_windows.cc \ src/core/lib/iomgr/iocp_windows.h \ src/core/lib/iomgr/iomgr.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 5014aea9cd..8ea5126fde 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -163,6 +163,23 @@ "third_party": false, "type": "target" }, + { + "deps": [ + "gpr", + "gpr_test_util", + "grpc", + "grpc_test_util" + ], + "headers": [], + "is_filegroup": false, + "language": "c", + "name": "buffer_list_test", + "src": [ + "test/core/iomgr/buffer_list_test.cc" + ], + "third_party": false, + "type": "target" + }, { "deps": [ "gpr", @@ -9488,6 +9505,7 @@ "src/core/lib/http/format_request.cc", "src/core/lib/http/httpcli.cc", "src/core/lib/http/parser.cc", + "src/core/lib/iomgr/buffer_list.cc", "src/core/lib/iomgr/call_combiner.cc", "src/core/lib/iomgr/combiner.cc", "src/core/lib/iomgr/endpoint.cc", @@ -9508,6 +9526,7 @@ "src/core/lib/iomgr/gethostname_fallback.cc", "src/core/lib/iomgr/gethostname_host_name_max.cc", "src/core/lib/iomgr/gethostname_sysconf.cc", + "src/core/lib/iomgr/internal_errqueue.cc", "src/core/lib/iomgr/iocp_windows.cc", "src/core/lib/iomgr/iomgr.cc", "src/core/lib/iomgr/iomgr_custom.cc", @@ -9667,6 +9686,7 @@ "src/core/lib/http/httpcli.h", "src/core/lib/http/parser.h", "src/core/lib/iomgr/block_annotate.h", + "src/core/lib/iomgr/buffer_list.h", "src/core/lib/iomgr/call_combiner.h", "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/combiner.h", @@ -9682,6 +9702,7 @@ "src/core/lib/iomgr/exec_ctx.h", "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/gethostname.h", + "src/core/lib/iomgr/internal_errqueue.h", "src/core/lib/iomgr/iocp_windows.h", "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_custom.h", @@ -9817,6 +9838,7 @@ "src/core/lib/http/httpcli.h", "src/core/lib/http/parser.h", "src/core/lib/iomgr/block_annotate.h", + "src/core/lib/iomgr/buffer_list.h", "src/core/lib/iomgr/call_combiner.h", "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/combiner.h", @@ -9832,6 +9854,7 @@ "src/core/lib/iomgr/exec_ctx.h", "src/core/lib/iomgr/executor.h", "src/core/lib/iomgr/gethostname.h", + "src/core/lib/iomgr/internal_errqueue.h", "src/core/lib/iomgr/iocp_windows.h", "src/core/lib/iomgr/iomgr.h", "src/core/lib/iomgr/iomgr_custom.h", diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 51c7b57d8a..fba76d69d1 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -195,6 +195,26 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [ + "uv" + ], + "flaky": false, + "gtest": false, + "language": "c", + "name": "buffer_list_test", + "platforms": [ + "linux" + ], + "uses_polling": true + }, { "args": [], "benchmark": false, -- cgit v1.2.3 From 0cb982974f12ead39b6a8969949f2fbbe59cf3cf Mon Sep 17 00:00:00 2001 From: Yash Tibrewal Date: Mon, 27 Aug 2018 15:10:25 -0700 Subject: Remove linux/version.h include from port.h --- src/core/lib/iomgr/internal_errqueue.cc | 4 ---- src/core/lib/iomgr/port.h | 1 - 2 files changed, 5 deletions(-) (limited to 'src/core/lib/iomgr/port.h') diff --git a/src/core/lib/iomgr/internal_errqueue.cc b/src/core/lib/iomgr/internal_errqueue.cc index 8823737e49..99c22e9055 100644 --- a/src/core/lib/iomgr/internal_errqueue.cc +++ b/src/core/lib/iomgr/internal_errqueue.cc @@ -24,10 +24,6 @@ #ifdef GRPC_POSIX_SOCKET_TCP -#ifdef GPR_LINUX -#include -#endif /* GPR_LINUX */ - bool kernel_supports_errqueue() { #ifdef LINUX_VERSION_CODE #if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index a4688fd0ef..abf96662f5 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -60,7 +60,6 @@ #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 #define GRPC_HAVE_UNIX_SOCKET 1 -#include #ifdef LINUX_VERSION_CODE #if LINUX_VERSION_CODE >= KERNEL_VERSION(4, 0, 0) #define GRPC_LINUX_ERRQUEUE 1 -- cgit v1.2.3