diff options
author | Muxi Yan <mxyan@google.com> | 2018-05-30 15:42:31 -0700 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2018-05-30 16:00:06 -0700 |
commit | 142cbb59480794980e53062c846bcc2d7512548a (patch) | |
tree | b393255c68f2f1141e703d2cb1bc70312bd1eb97 | |
parent | a6e8d0ba24a684713b879eb18e7ce5313f8030cf (diff) |
Address comments on names and comments
26 files changed, 687 insertions, 677 deletions
@@ -696,8 +696,10 @@ grpc_cc_library( "src/core/lib/http/httpcli.cc", "src/core/lib/http/parser.cc", "src/core/lib/iomgr/call_combiner.cc", + "src/core/lib/iomgr/cfstream_handle.cc", "src/core/lib/iomgr/combiner.cc", "src/core/lib/iomgr/endpoint.cc", + "src/core/lib/iomgr/endpoint_cfstream.cc", "src/core/lib/iomgr/endpoint_pair_posix.cc", "src/core/lib/iomgr/endpoint_pair_uv.cc", "src/core/lib/iomgr/endpoint_pair_windows.cc", @@ -747,8 +749,6 @@ grpc_cc_library( "src/core/lib/iomgr/socket_utils_posix.cc", "src/core/lib/iomgr/socket_utils_windows.cc", "src/core/lib/iomgr/socket_windows.cc", - "src/core/lib/iomgr/tcp_cfstream.cc", - "src/core/lib/iomgr/tcp_cfstream_sync.cc", "src/core/lib/iomgr/tcp_client.cc", "src/core/lib/iomgr/tcp_client_cfstream.cc", "src/core/lib/iomgr/tcp_client_custom.cc", @@ -849,9 +849,11 @@ grpc_cc_library( "src/core/lib/http/parser.h", "src/core/lib/iomgr/block_annotate.h", "src/core/lib/iomgr/call_combiner.h", + "src/core/lib/iomgr/cfstream_handle.h", "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/combiner.h", "src/core/lib/iomgr/endpoint.h", + "src/core/lib/iomgr/endpoint_cfstream.h", "src/core/lib/iomgr/endpoint_pair.h", "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/error_apple.h", @@ -897,8 +899,6 @@ grpc_cc_library( "src/core/lib/iomgr/socket_utils_posix.h", "src/core/lib/iomgr/socket_windows.h", "src/core/lib/iomgr/sys_epoll_wrapper.h", - "src/core/lib/iomgr/tcp_cfstream.h", - "src/core/lib/iomgr/tcp_cfstream_sync.h", "src/core/lib/iomgr/tcp_client.h", "src/core/lib/iomgr/tcp_client_posix.h", "src/core/lib/iomgr/tcp_custom.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index 07d008343a..0f66a716e2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -941,8 +941,10 @@ add_library(grpc src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc + src/core/lib/iomgr/cfstream_handle.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc + src/core/lib/iomgr/endpoint_cfstream.cc src/core/lib/iomgr/endpoint_pair_posix.cc src/core/lib/iomgr/endpoint_pair_uv.cc src/core/lib/iomgr/endpoint_pair_windows.cc @@ -994,8 +996,6 @@ add_library(grpc src/core/lib/iomgr/socket_utils_uv.cc src/core/lib/iomgr/socket_utils_windows.cc src/core/lib/iomgr/socket_windows.cc - src/core/lib/iomgr/tcp_cfstream.cc - src/core/lib/iomgr/tcp_cfstream_sync.cc src/core/lib/iomgr/tcp_client.cc src/core/lib/iomgr/tcp_client_cfstream.cc src/core/lib/iomgr/tcp_client_custom.cc @@ -1338,8 +1338,10 @@ add_library(grpc_cronet src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc + src/core/lib/iomgr/cfstream_handle.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc + src/core/lib/iomgr/endpoint_cfstream.cc src/core/lib/iomgr/endpoint_pair_posix.cc src/core/lib/iomgr/endpoint_pair_uv.cc src/core/lib/iomgr/endpoint_pair_windows.cc @@ -1391,8 +1393,6 @@ add_library(grpc_cronet src/core/lib/iomgr/socket_utils_uv.cc src/core/lib/iomgr/socket_utils_windows.cc src/core/lib/iomgr/socket_windows.cc - src/core/lib/iomgr/tcp_cfstream.cc - src/core/lib/iomgr/tcp_cfstream_sync.cc src/core/lib/iomgr/tcp_client.cc src/core/lib/iomgr/tcp_client_cfstream.cc src/core/lib/iomgr/tcp_client_custom.cc @@ -1727,8 +1727,10 @@ add_library(grpc_test_util src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc + src/core/lib/iomgr/cfstream_handle.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc + src/core/lib/iomgr/endpoint_cfstream.cc src/core/lib/iomgr/endpoint_pair_posix.cc src/core/lib/iomgr/endpoint_pair_uv.cc src/core/lib/iomgr/endpoint_pair_windows.cc @@ -1780,8 +1782,6 @@ add_library(grpc_test_util src/core/lib/iomgr/socket_utils_uv.cc src/core/lib/iomgr/socket_utils_windows.cc src/core/lib/iomgr/socket_windows.cc - src/core/lib/iomgr/tcp_cfstream.cc - src/core/lib/iomgr/tcp_cfstream_sync.cc src/core/lib/iomgr/tcp_client.cc src/core/lib/iomgr/tcp_client_cfstream.cc src/core/lib/iomgr/tcp_client_custom.cc @@ -2035,8 +2035,10 @@ add_library(grpc_test_util_unsecure src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc + src/core/lib/iomgr/cfstream_handle.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc + src/core/lib/iomgr/endpoint_cfstream.cc src/core/lib/iomgr/endpoint_pair_posix.cc src/core/lib/iomgr/endpoint_pair_uv.cc src/core/lib/iomgr/endpoint_pair_windows.cc @@ -2088,8 +2090,6 @@ add_library(grpc_test_util_unsecure src/core/lib/iomgr/socket_utils_uv.cc src/core/lib/iomgr/socket_utils_windows.cc src/core/lib/iomgr/socket_windows.cc - src/core/lib/iomgr/tcp_cfstream.cc - src/core/lib/iomgr/tcp_cfstream_sync.cc src/core/lib/iomgr/tcp_client.cc src/core/lib/iomgr/tcp_client_cfstream.cc src/core/lib/iomgr/tcp_client_custom.cc @@ -2322,8 +2322,10 @@ add_library(grpc_unsecure src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc + src/core/lib/iomgr/cfstream_handle.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc + src/core/lib/iomgr/endpoint_cfstream.cc src/core/lib/iomgr/endpoint_pair_posix.cc src/core/lib/iomgr/endpoint_pair_uv.cc src/core/lib/iomgr/endpoint_pair_windows.cc @@ -2375,8 +2377,6 @@ add_library(grpc_unsecure src/core/lib/iomgr/socket_utils_uv.cc src/core/lib/iomgr/socket_utils_windows.cc src/core/lib/iomgr/socket_windows.cc - src/core/lib/iomgr/tcp_cfstream.cc - src/core/lib/iomgr/tcp_cfstream_sync.cc src/core/lib/iomgr/tcp_client.cc src/core/lib/iomgr/tcp_client_cfstream.cc src/core/lib/iomgr/tcp_client_custom.cc @@ -3152,8 +3152,10 @@ add_library(grpc++_cronet src/core/lib/http/httpcli.cc src/core/lib/http/parser.cc src/core/lib/iomgr/call_combiner.cc + src/core/lib/iomgr/cfstream_handle.cc src/core/lib/iomgr/combiner.cc src/core/lib/iomgr/endpoint.cc + src/core/lib/iomgr/endpoint_cfstream.cc src/core/lib/iomgr/endpoint_pair_posix.cc src/core/lib/iomgr/endpoint_pair_uv.cc src/core/lib/iomgr/endpoint_pair_windows.cc @@ -3205,8 +3207,6 @@ add_library(grpc++_cronet src/core/lib/iomgr/socket_utils_uv.cc src/core/lib/iomgr/socket_utils_windows.cc src/core/lib/iomgr/socket_windows.cc - src/core/lib/iomgr/tcp_cfstream.cc - src/core/lib/iomgr/tcp_cfstream_sync.cc src/core/lib/iomgr/tcp_client.cc src/core/lib/iomgr/tcp_client_cfstream.cc src/core/lib/iomgr/tcp_client_custom.cc @@ -3335,8 +3335,10 @@ LIBGRPC_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ + src/core/lib/iomgr/cfstream_handle.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ + src/core/lib/iomgr/endpoint_cfstream.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ src/core/lib/iomgr/endpoint_pair_uv.cc \ src/core/lib/iomgr/endpoint_pair_windows.cc \ @@ -3388,8 +3390,6 @@ LIBGRPC_SRC = \ src/core/lib/iomgr/socket_utils_uv.cc \ src/core/lib/iomgr/socket_utils_windows.cc \ src/core/lib/iomgr/socket_windows.cc \ - src/core/lib/iomgr/tcp_cfstream.cc \ - src/core/lib/iomgr/tcp_cfstream_sync.cc \ src/core/lib/iomgr/tcp_client.cc \ src/core/lib/iomgr/tcp_client_cfstream.cc \ src/core/lib/iomgr/tcp_client_custom.cc \ @@ -3732,8 +3732,10 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ + src/core/lib/iomgr/cfstream_handle.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ + src/core/lib/iomgr/endpoint_cfstream.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ src/core/lib/iomgr/endpoint_pair_uv.cc \ src/core/lib/iomgr/endpoint_pair_windows.cc \ @@ -3785,8 +3787,6 @@ LIBGRPC_CRONET_SRC = \ src/core/lib/iomgr/socket_utils_uv.cc \ src/core/lib/iomgr/socket_utils_windows.cc \ src/core/lib/iomgr/socket_windows.cc \ - src/core/lib/iomgr/tcp_cfstream.cc \ - src/core/lib/iomgr/tcp_cfstream_sync.cc \ src/core/lib/iomgr/tcp_client.cc \ src/core/lib/iomgr/tcp_client_cfstream.cc \ src/core/lib/iomgr/tcp_client_custom.cc \ @@ -4120,8 +4120,10 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ + src/core/lib/iomgr/cfstream_handle.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ + src/core/lib/iomgr/endpoint_cfstream.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ src/core/lib/iomgr/endpoint_pair_uv.cc \ src/core/lib/iomgr/endpoint_pair_windows.cc \ @@ -4173,8 +4175,6 @@ LIBGRPC_TEST_UTIL_SRC = \ src/core/lib/iomgr/socket_utils_uv.cc \ src/core/lib/iomgr/socket_utils_windows.cc \ src/core/lib/iomgr/socket_windows.cc \ - src/core/lib/iomgr/tcp_cfstream.cc \ - src/core/lib/iomgr/tcp_cfstream_sync.cc \ src/core/lib/iomgr/tcp_client.cc \ src/core/lib/iomgr/tcp_client_cfstream.cc \ src/core/lib/iomgr/tcp_client_custom.cc \ @@ -4420,8 +4420,10 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ + src/core/lib/iomgr/cfstream_handle.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ + src/core/lib/iomgr/endpoint_cfstream.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ src/core/lib/iomgr/endpoint_pair_uv.cc \ src/core/lib/iomgr/endpoint_pair_windows.cc \ @@ -4473,8 +4475,6 @@ LIBGRPC_TEST_UTIL_UNSECURE_SRC = \ src/core/lib/iomgr/socket_utils_uv.cc \ src/core/lib/iomgr/socket_utils_windows.cc \ src/core/lib/iomgr/socket_windows.cc \ - src/core/lib/iomgr/tcp_cfstream.cc \ - src/core/lib/iomgr/tcp_cfstream_sync.cc \ src/core/lib/iomgr/tcp_client.cc \ src/core/lib/iomgr/tcp_client_cfstream.cc \ src/core/lib/iomgr/tcp_client_custom.cc \ @@ -4686,8 +4686,10 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ + src/core/lib/iomgr/cfstream_handle.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ + src/core/lib/iomgr/endpoint_cfstream.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ src/core/lib/iomgr/endpoint_pair_uv.cc \ src/core/lib/iomgr/endpoint_pair_windows.cc \ @@ -4739,8 +4741,6 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/iomgr/socket_utils_uv.cc \ src/core/lib/iomgr/socket_utils_windows.cc \ src/core/lib/iomgr/socket_windows.cc \ - src/core/lib/iomgr/tcp_cfstream.cc \ - src/core/lib/iomgr/tcp_cfstream_sync.cc \ src/core/lib/iomgr/tcp_client.cc \ src/core/lib/iomgr/tcp_client_cfstream.cc \ src/core/lib/iomgr/tcp_client_custom.cc \ @@ -5509,8 +5509,10 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ + src/core/lib/iomgr/cfstream_handle.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ + src/core/lib/iomgr/endpoint_cfstream.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ src/core/lib/iomgr/endpoint_pair_uv.cc \ src/core/lib/iomgr/endpoint_pair_windows.cc \ @@ -5562,8 +5564,6 @@ LIBGRPC++_CRONET_SRC = \ src/core/lib/iomgr/socket_utils_uv.cc \ src/core/lib/iomgr/socket_utils_windows.cc \ src/core/lib/iomgr/socket_windows.cc \ - src/core/lib/iomgr/tcp_cfstream.cc \ - src/core/lib/iomgr/tcp_cfstream_sync.cc \ src/core/lib/iomgr/tcp_client.cc \ src/core/lib/iomgr/tcp_client_cfstream.cc \ src/core/lib/iomgr/tcp_client_custom.cc \ diff --git a/build.yaml b/build.yaml index 425d72fd17..7f2fad758e 100644 --- a/build.yaml +++ b/build.yaml @@ -253,8 +253,10 @@ filegroups: - src/core/lib/http/httpcli.cc - src/core/lib/http/parser.cc - src/core/lib/iomgr/call_combiner.cc + - src/core/lib/iomgr/cfstream_handle.cc - src/core/lib/iomgr/combiner.cc - src/core/lib/iomgr/endpoint.cc + - src/core/lib/iomgr/endpoint_cfstream.cc - src/core/lib/iomgr/endpoint_pair_posix.cc - src/core/lib/iomgr/endpoint_pair_uv.cc - src/core/lib/iomgr/endpoint_pair_windows.cc @@ -306,8 +308,6 @@ filegroups: - src/core/lib/iomgr/socket_utils_uv.cc - src/core/lib/iomgr/socket_utils_windows.cc - src/core/lib/iomgr/socket_windows.cc - - src/core/lib/iomgr/tcp_cfstream.cc - - src/core/lib/iomgr/tcp_cfstream_sync.cc - src/core/lib/iomgr/tcp_client.cc - src/core/lib/iomgr/tcp_client_cfstream.cc - src/core/lib/iomgr/tcp_client_custom.cc @@ -434,9 +434,11 @@ filegroups: - src/core/lib/http/parser.h - src/core/lib/iomgr/block_annotate.h - src/core/lib/iomgr/call_combiner.h + - src/core/lib/iomgr/cfstream_handle.h - src/core/lib/iomgr/closure.h - src/core/lib/iomgr/combiner.h - src/core/lib/iomgr/endpoint.h + - src/core/lib/iomgr/endpoint_cfstream.h - src/core/lib/iomgr/endpoint_pair.h - src/core/lib/iomgr/error.h - src/core/lib/iomgr/error_apple.h @@ -481,8 +483,6 @@ filegroups: - src/core/lib/iomgr/socket_utils_posix.h - src/core/lib/iomgr/socket_windows.h - src/core/lib/iomgr/sys_epoll_wrapper.h - - src/core/lib/iomgr/tcp_cfstream.h - - src/core/lib/iomgr/tcp_cfstream_sync.h - src/core/lib/iomgr/tcp_client.h - src/core/lib/iomgr/tcp_client_posix.h - src/core/lib/iomgr/tcp_custom.h @@ -107,8 +107,10 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/http/httpcli.cc \ src/core/lib/http/parser.cc \ src/core/lib/iomgr/call_combiner.cc \ + src/core/lib/iomgr/cfstream_handle.cc \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/endpoint.cc \ + src/core/lib/iomgr/endpoint_cfstream.cc \ src/core/lib/iomgr/endpoint_pair_posix.cc \ src/core/lib/iomgr/endpoint_pair_uv.cc \ src/core/lib/iomgr/endpoint_pair_windows.cc \ @@ -160,8 +162,6 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/iomgr/socket_utils_uv.cc \ src/core/lib/iomgr/socket_utils_windows.cc \ src/core/lib/iomgr/socket_windows.cc \ - src/core/lib/iomgr/tcp_cfstream.cc \ - src/core/lib/iomgr/tcp_cfstream_sync.cc \ src/core/lib/iomgr/tcp_client.cc \ src/core/lib/iomgr/tcp_client_cfstream.cc \ src/core/lib/iomgr/tcp_client_custom.cc \ diff --git a/config.w32 b/config.w32 index c8df14eb38..376293dd4b 100644 --- a/config.w32 +++ b/config.w32 @@ -83,8 +83,10 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\http\\httpcli.cc " + "src\\core\\lib\\http\\parser.cc " + "src\\core\\lib\\iomgr\\call_combiner.cc " + + "src\\core\\lib\\iomgr\\cfstream_handle.cc " + "src\\core\\lib\\iomgr\\combiner.cc " + "src\\core\\lib\\iomgr\\endpoint.cc " + + "src\\core\\lib\\iomgr\\endpoint_cfstream.cc " + "src\\core\\lib\\iomgr\\endpoint_pair_posix.cc " + "src\\core\\lib\\iomgr\\endpoint_pair_uv.cc " + "src\\core\\lib\\iomgr\\endpoint_pair_windows.cc " + @@ -136,8 +138,6 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\iomgr\\socket_utils_uv.cc " + "src\\core\\lib\\iomgr\\socket_utils_windows.cc " + "src\\core\\lib\\iomgr\\socket_windows.cc " + - "src\\core\\lib\\iomgr\\tcp_cfstream.cc " + - "src\\core\\lib\\iomgr\\tcp_cfstream_sync.cc " + "src\\core\\lib\\iomgr\\tcp_client.cc " + "src\\core\\lib\\iomgr\\tcp_client_cfstream.cc " + "src\\core\\lib\\iomgr\\tcp_client_custom.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index 2f38ecc6b0..790ec3bd26 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -372,9 +372,11 @@ Pod::Spec.new do |s| 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', 'src/core/lib/iomgr/call_combiner.h', + 'src/core/lib/iomgr/cfstream_handle.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', 'src/core/lib/iomgr/endpoint.h', + 'src/core/lib/iomgr/endpoint_cfstream.h', 'src/core/lib/iomgr/endpoint_pair.h', 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/error_apple.h', @@ -419,8 +421,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/socket_utils_posix.h', 'src/core/lib/iomgr/socket_windows.h', 'src/core/lib/iomgr/sys_epoll_wrapper.h', - 'src/core/lib/iomgr/tcp_cfstream.h', - 'src/core/lib/iomgr/tcp_cfstream_sync.h', 'src/core/lib/iomgr/tcp_client.h', 'src/core/lib/iomgr/tcp_client_posix.h', 'src/core/lib/iomgr/tcp_custom.h', @@ -560,9 +560,11 @@ Pod::Spec.new do |s| 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', 'src/core/lib/iomgr/call_combiner.h', + 'src/core/lib/iomgr/cfstream_handle.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', 'src/core/lib/iomgr/endpoint.h', + 'src/core/lib/iomgr/endpoint_cfstream.h', 'src/core/lib/iomgr/endpoint_pair.h', 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/error_apple.h', @@ -607,8 +609,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/socket_utils_posix.h', 'src/core/lib/iomgr/socket_windows.h', 'src/core/lib/iomgr/sys_epoll_wrapper.h', - 'src/core/lib/iomgr/tcp_cfstream.h', - 'src/core/lib/iomgr/tcp_cfstream_sync.h', 'src/core/lib/iomgr/tcp_client.h', 'src/core/lib/iomgr/tcp_client_posix.h', 'src/core/lib/iomgr/tcp_custom.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 71ff31dc43..6eb5fb2ae9 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -382,9 +382,11 @@ Pod::Spec.new do |s| 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', 'src/core/lib/iomgr/call_combiner.h', + 'src/core/lib/iomgr/cfstream_handle.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', 'src/core/lib/iomgr/endpoint.h', + 'src/core/lib/iomgr/endpoint_cfstream.h', 'src/core/lib/iomgr/endpoint_pair.h', 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/error_apple.h', @@ -429,8 +431,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/socket_utils_posix.h', 'src/core/lib/iomgr/socket_windows.h', 'src/core/lib/iomgr/sys_epoll_wrapper.h', - 'src/core/lib/iomgr/tcp_cfstream.h', - 'src/core/lib/iomgr/tcp_cfstream_sync.h', 'src/core/lib/iomgr/tcp_client.h', 'src/core/lib/iomgr/tcp_client_posix.h', 'src/core/lib/iomgr/tcp_custom.h', @@ -528,8 +528,10 @@ Pod::Spec.new do |s| 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', + 'src/core/lib/iomgr/cfstream_handle.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', + 'src/core/lib/iomgr/endpoint_cfstream.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', 'src/core/lib/iomgr/endpoint_pair_uv.cc', 'src/core/lib/iomgr/endpoint_pair_windows.cc', @@ -581,8 +583,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/socket_utils_uv.cc', 'src/core/lib/iomgr/socket_utils_windows.cc', 'src/core/lib/iomgr/socket_windows.cc', - 'src/core/lib/iomgr/tcp_cfstream.cc', - 'src/core/lib/iomgr/tcp_cfstream_sync.cc', 'src/core/lib/iomgr/tcp_client.cc', 'src/core/lib/iomgr/tcp_client_cfstream.cc', 'src/core/lib/iomgr/tcp_client_custom.cc', @@ -967,9 +967,11 @@ Pod::Spec.new do |s| 'src/core/lib/http/parser.h', 'src/core/lib/iomgr/block_annotate.h', 'src/core/lib/iomgr/call_combiner.h', + 'src/core/lib/iomgr/cfstream_handle.h', 'src/core/lib/iomgr/closure.h', 'src/core/lib/iomgr/combiner.h', 'src/core/lib/iomgr/endpoint.h', + 'src/core/lib/iomgr/endpoint_cfstream.h', 'src/core/lib/iomgr/endpoint_pair.h', 'src/core/lib/iomgr/error.h', 'src/core/lib/iomgr/error_apple.h', @@ -1014,8 +1016,6 @@ Pod::Spec.new do |s| 'src/core/lib/iomgr/socket_utils_posix.h', 'src/core/lib/iomgr/socket_windows.h', 'src/core/lib/iomgr/sys_epoll_wrapper.h', - 'src/core/lib/iomgr/tcp_cfstream.h', - 'src/core/lib/iomgr/tcp_cfstream_sync.h', 'src/core/lib/iomgr/tcp_client.h', 'src/core/lib/iomgr/tcp_client_posix.h', 'src/core/lib/iomgr/tcp_custom.h', diff --git a/grpc.gemspec b/grpc.gemspec index e807b1ba70..e768a214d6 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -317,9 +317,11 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/http/parser.h ) s.files += %w( src/core/lib/iomgr/block_annotate.h ) s.files += %w( src/core/lib/iomgr/call_combiner.h ) + s.files += %w( src/core/lib/iomgr/cfstream_handle.h ) s.files += %w( src/core/lib/iomgr/closure.h ) s.files += %w( src/core/lib/iomgr/combiner.h ) s.files += %w( src/core/lib/iomgr/endpoint.h ) + s.files += %w( src/core/lib/iomgr/endpoint_cfstream.h ) s.files += %w( src/core/lib/iomgr/endpoint_pair.h ) s.files += %w( src/core/lib/iomgr/error.h ) s.files += %w( src/core/lib/iomgr/error_apple.h ) @@ -364,8 +366,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/socket_utils_posix.h ) s.files += %w( src/core/lib/iomgr/socket_windows.h ) s.files += %w( src/core/lib/iomgr/sys_epoll_wrapper.h ) - s.files += %w( src/core/lib/iomgr/tcp_cfstream.h ) - s.files += %w( src/core/lib/iomgr/tcp_cfstream_sync.h ) s.files += %w( src/core/lib/iomgr/tcp_client.h ) s.files += %w( src/core/lib/iomgr/tcp_client_posix.h ) s.files += %w( src/core/lib/iomgr/tcp_custom.h ) @@ -463,8 +463,10 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/http/httpcli.cc ) s.files += %w( src/core/lib/http/parser.cc ) s.files += %w( src/core/lib/iomgr/call_combiner.cc ) + s.files += %w( src/core/lib/iomgr/cfstream_handle.cc ) s.files += %w( src/core/lib/iomgr/combiner.cc ) s.files += %w( src/core/lib/iomgr/endpoint.cc ) + s.files += %w( src/core/lib/iomgr/endpoint_cfstream.cc ) s.files += %w( src/core/lib/iomgr/endpoint_pair_posix.cc ) s.files += %w( src/core/lib/iomgr/endpoint_pair_uv.cc ) s.files += %w( src/core/lib/iomgr/endpoint_pair_windows.cc ) @@ -516,8 +518,6 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/iomgr/socket_utils_uv.cc ) s.files += %w( src/core/lib/iomgr/socket_utils_windows.cc ) s.files += %w( src/core/lib/iomgr/socket_windows.cc ) - s.files += %w( src/core/lib/iomgr/tcp_cfstream.cc ) - s.files += %w( src/core/lib/iomgr/tcp_cfstream_sync.cc ) s.files += %w( src/core/lib/iomgr/tcp_client.cc ) s.files += %w( src/core/lib/iomgr/tcp_client_cfstream.cc ) s.files += %w( src/core/lib/iomgr/tcp_client_custom.cc ) @@ -271,8 +271,10 @@ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', + 'src/core/lib/iomgr/cfstream_handle.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', + 'src/core/lib/iomgr/endpoint_cfstream.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', 'src/core/lib/iomgr/endpoint_pair_uv.cc', 'src/core/lib/iomgr/endpoint_pair_windows.cc', @@ -324,8 +326,6 @@ 'src/core/lib/iomgr/socket_utils_uv.cc', 'src/core/lib/iomgr/socket_utils_windows.cc', 'src/core/lib/iomgr/socket_windows.cc', - 'src/core/lib/iomgr/tcp_cfstream.cc', - 'src/core/lib/iomgr/tcp_cfstream_sync.cc', 'src/core/lib/iomgr/tcp_client.cc', 'src/core/lib/iomgr/tcp_client_cfstream.cc', 'src/core/lib/iomgr/tcp_client_custom.cc', @@ -624,8 +624,10 @@ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', + 'src/core/lib/iomgr/cfstream_handle.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', + 'src/core/lib/iomgr/endpoint_cfstream.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', 'src/core/lib/iomgr/endpoint_pair_uv.cc', 'src/core/lib/iomgr/endpoint_pair_windows.cc', @@ -677,8 +679,6 @@ 'src/core/lib/iomgr/socket_utils_uv.cc', 'src/core/lib/iomgr/socket_utils_windows.cc', 'src/core/lib/iomgr/socket_windows.cc', - 'src/core/lib/iomgr/tcp_cfstream.cc', - 'src/core/lib/iomgr/tcp_cfstream_sync.cc', 'src/core/lib/iomgr/tcp_client.cc', 'src/core/lib/iomgr/tcp_client_cfstream.cc', 'src/core/lib/iomgr/tcp_client_custom.cc', @@ -859,8 +859,10 @@ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', + 'src/core/lib/iomgr/cfstream_handle.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', + 'src/core/lib/iomgr/endpoint_cfstream.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', 'src/core/lib/iomgr/endpoint_pair_uv.cc', 'src/core/lib/iomgr/endpoint_pair_windows.cc', @@ -912,8 +914,6 @@ 'src/core/lib/iomgr/socket_utils_uv.cc', 'src/core/lib/iomgr/socket_utils_windows.cc', 'src/core/lib/iomgr/socket_windows.cc', - 'src/core/lib/iomgr/tcp_cfstream.cc', - 'src/core/lib/iomgr/tcp_cfstream_sync.cc', 'src/core/lib/iomgr/tcp_client.cc', 'src/core/lib/iomgr/tcp_client_cfstream.cc', 'src/core/lib/iomgr/tcp_client_custom.cc', @@ -1072,8 +1072,10 @@ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', + 'src/core/lib/iomgr/cfstream_handle.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', + 'src/core/lib/iomgr/endpoint_cfstream.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', 'src/core/lib/iomgr/endpoint_pair_uv.cc', 'src/core/lib/iomgr/endpoint_pair_windows.cc', @@ -1125,8 +1127,6 @@ 'src/core/lib/iomgr/socket_utils_uv.cc', 'src/core/lib/iomgr/socket_utils_windows.cc', 'src/core/lib/iomgr/socket_windows.cc', - 'src/core/lib/iomgr/tcp_cfstream.cc', - 'src/core/lib/iomgr/tcp_cfstream_sync.cc', 'src/core/lib/iomgr/tcp_client.cc', 'src/core/lib/iomgr/tcp_client_cfstream.cc', 'src/core/lib/iomgr/tcp_client_custom.cc', diff --git a/package.xml b/package.xml index d1794f7eed..f02bf85068 100644 --- a/package.xml +++ b/package.xml @@ -324,9 +324,11 @@ <file baseinstalldir="/" name="src/core/lib/http/parser.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/block_annotate.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/call_combiner.h" role="src" /> + <file baseinstalldir="/" name="src/core/lib/iomgr/cfstream_handle.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/closure.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/combiner.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.h" role="src" /> + <file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_cfstream.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/error.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/error_apple.h" role="src" /> @@ -371,8 +373,6 @@ <file baseinstalldir="/" name="src/core/lib/iomgr/socket_utils_posix.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/socket_windows.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/sys_epoll_wrapper.h" role="src" /> - <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_cfstream.h" role="src" /> - <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_cfstream_sync.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_client.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_client_posix.h" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_custom.h" role="src" /> @@ -470,8 +470,10 @@ <file baseinstalldir="/" name="src/core/lib/http/httpcli.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/http/parser.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/call_combiner.cc" role="src" /> + <file baseinstalldir="/" name="src/core/lib/iomgr/cfstream_handle.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/combiner.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/endpoint.cc" role="src" /> + <file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_cfstream.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_posix.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_uv.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/endpoint_pair_windows.cc" role="src" /> @@ -523,8 +525,6 @@ <file baseinstalldir="/" name="src/core/lib/iomgr/socket_utils_uv.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/socket_utils_windows.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/socket_windows.cc" role="src" /> - <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_cfstream.cc" role="src" /> - <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_cfstream_sync.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_client.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_client_cfstream.cc" role="src" /> <file baseinstalldir="/" name="src/core/lib/iomgr/tcp_client_custom.cc" role="src" /> diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc new file mode 100644 index 0000000000..543179d93e --- /dev/null +++ b/src/core/lib/iomgr/cfstream_handle.cc @@ -0,0 +1,185 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM +#import <CoreFoundation/CoreFoundation.h> +#import "src/core/lib/iomgr/cfstream_handle.h" + +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +void* CFStreamHandle::Retain(void* info) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(info); + CFSTREAM_SYNC_REF(handle, "retain"); + return info; +} + +void CFStreamHandle::Release(void* info) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(info); + CFSTREAM_SYNC_UNREF(handle, "release"); +} + +CFStreamHandle* CFStreamHandle::CreateStreamSync( + CFReadStreamRef read_stream, CFWriteStreamRef write_stream) { + return new CFStreamHandle(read_stream, write_stream); +} + +void CFStreamHandle::ReadCallback(CFReadStreamRef stream, + CFStreamEventType type, + void* client_callback_info) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info); + CFSTREAM_SYNC_REF(handle, "read callback"); + dispatch_async( + dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle, + stream, type, client_callback_info); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventHasBytesAvailable: + case kCFStreamEventEndEncountered: + handle->read_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->read_event_.SetReady(); + break; + default: + // Impossible + abort(); + } + CFSTREAM_SYNC_UNREF(handle, "read callback"); + }); +} +void CFStreamHandle::WriteCallback(CFWriteStreamRef stream, + CFStreamEventType type, + void* clientCallBackInfo) { + CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo); + CFSTREAM_SYNC_REF(handle, "write callback"); + dispatch_async( + dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle, + stream, type, clientCallBackInfo); + } + switch (type) { + case kCFStreamEventOpenCompleted: + handle->open_event_.SetReady(); + break; + case kCFStreamEventCanAcceptBytes: + case kCFStreamEventEndEncountered: + handle->write_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + handle->open_event_.SetReady(); + handle->write_event_.SetReady(); + break; + default: + // Impossible + abort(); + } + CFSTREAM_SYNC_UNREF(handle, "write callback"); + }); +} + +CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream) { + gpr_ref_init(&refcount_, 1); + open_event_.InitEvent(); + read_event_.InitEvent(); + write_event_.InitEvent(); + CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil}; + CFReadStreamSetClient( + read_stream, + kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamHandle::ReadCallback, &ctx); + CFWriteStreamSetClient( + write_stream, + kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamHandle::WriteCallback, &ctx); + CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); + CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); +} + +CFStreamHandle::~CFStreamHandle() { + open_event_.DestroyEvent(); + read_event_.DestroyEvent(); + write_event_.DestroyEvent(); +} + +void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) { + open_event_.NotifyOn(closure); +} + +void CFStreamHandle::NotifyOnRead(grpc_closure* closure) { + read_event_.NotifyOn(closure); +} + +void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) { + write_event_.NotifyOn(closure); +} + +void CFStreamHandle::Shutdown(grpc_error* error) { + open_event_.SetShutdown(GRPC_ERROR_REF(error)); + read_event_.SetShutdown(GRPC_ERROR_REF(error)); + write_event_.SetShutdown(GRPC_ERROR_REF(error)); + GRPC_ERROR_UNREF(error); +} + +void CFStreamHandle::Ref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, + reason, val, val + 1); + } + gpr_ref(&refcount_); +} + +void CFStreamHandle::Unref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(GPR_ERROR, + "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, + reason, val, val - 1); + } + if (gpr_unref(&refcount_)) { + delete this; + } +} + +#endif diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.h b/src/core/lib/iomgr/cfstream_handle.h index 9e1d8fbed9..f6f7ae54ed 100644 --- a/src/core/lib/iomgr/tcp_cfstream_sync.h +++ b/src/core/lib/iomgr/cfstream_handle.h @@ -16,8 +16,11 @@ * */ -#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H -#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H +/* The CFStream handle acts as an event synchronization entity for + * read/write/open/error/eos events happening on CFStream streams. */ + +#ifndef GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H +#define GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H #include <grpc/support/port_platform.h> @@ -29,14 +32,14 @@ #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/lockfree_event.h" -class CFStreamSync final { +class CFStreamHandle final { public: - static CFStreamSync* CreateStreamSync(CFReadStreamRef read_stream, - CFWriteStreamRef write_stream); - ~CFStreamSync(); - CFStreamSync(const CFReadStreamRef& ref) = delete; - CFStreamSync(CFReadStreamRef&& ref) = delete; - CFStreamSync& operator=(const CFStreamSync& rhs) = delete; + static CFStreamHandle* CreateStreamSync(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream); + ~CFStreamHandle(); + CFStreamHandle(const CFReadStreamRef& ref) = delete; + CFStreamHandle(CFReadStreamRef&& ref) = delete; + CFStreamHandle& operator=(const CFStreamHandle& rhs) = delete; void NotifyOnOpen(grpc_closure* closure); void NotifyOnRead(grpc_closure* closure); @@ -49,7 +52,7 @@ class CFStreamSync final { const char* reason = nullptr); private: - CFStreamSync(CFReadStreamRef read_stream, CFWriteStreamRef write_stream); + CFStreamHandle(CFReadStreamRef read_stream, CFWriteStreamRef write_stream); static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type, void* client_callback_info); static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, @@ -76,4 +79,4 @@ class CFStreamSync final { #endif -#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H */ +#endif /* GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H */ diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc new file mode 100644 index 0000000000..aa4359dfb2 --- /dev/null +++ b/src/core/lib/iomgr/endpoint_cfstream.cc @@ -0,0 +1,372 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_ENDPOINT + +#import <CoreFoundation/CoreFoundation.h> +#import "src/core/lib/iomgr/endpoint_cfstream.h" + +#include <grpc/slice_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/cfstream_handle.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error_apple.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct { + grpc_endpoint base; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamHandle* stream_sync; + + grpc_closure* read_cb; + grpc_closure* write_cb; + grpc_slice_buffer* read_slices; + grpc_slice_buffer* write_slices; + + grpc_closure read_action; + grpc_closure write_action; + + char* peer_string; + grpc_resource_user* resource_user; + grpc_resource_user_slice_allocator slice_allocator; +} CFStreamEndpoint; + +static void CFStreamFree(CFStreamEndpoint* ep) { + grpc_resource_user_unref(ep->resource_user); + CFRelease(ep->read_stream); + CFRelease(ep->write_stream); + CFSTREAM_SYNC_UNREF(ep->stream_sync, "free"); + gpr_free(ep->peer_string); + gpr_free(ep); +} + +#ifndef NDEBUG +#define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__) +#define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__) +static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason, + const char* file, int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, + reason, val, val - 1); + } + if (gpr_unref(&ep->refcount)) { + CFStreamFree(ep); + } +} +static void CFStreamRef(CFStreamEndpoint* ep, const char* reason, + const char* file, int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, + reason, val, val + 1); + } + gpr_ref(&ep->refcount); +} +#else +#define EP_REF(ep, reason) CFStreamRef((ep)) +#define EP_UNREF(ep, reason) CFStreamUnref((ep)) +static void CFStreamUnref(CFStreamEndpoint* ep) { + if (gpr_unref(&ep->refcount)) { + CFStreamFree(ep); + } +} +static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); } +#endif + +static grpc_error* CFStreamAnnotateError(grpc_error* src_error, + CFStreamEndpoint* ep) { + return grpc_error_set_str( + grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE), + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(ep->peer_string)); +} + +static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep, + ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg); + size_t i; + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + + for (i = 0; i < ep->read_slices->count; i++) { + char* dump = grpc_dump_slice(ep->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump); + gpr_free(dump); + } + } + grpc_closure* cb = ep->read_cb; + ep->read_cb = nullptr; + ep->read_slices = nullptr; + GRPC_CLOSURE_SCHED(cb, error); +} + +static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep, + ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg); + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "write: error=%s", str); + } + grpc_closure* cb = ep->write_cb; + ep->write_cb = nullptr; + ep->write_slices = nullptr; + GRPC_CLOSURE_SCHED(cb, error); +} + +static void ReadAction(void* arg, grpc_error* error) { + CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg); + GPR_ASSERT(ep->read_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CallReadCb(ep, GRPC_ERROR_REF(error)); + EP_UNREF(ep, "read"); + return; + } + + GPR_ASSERT(ep->read_slices->count == 1); + grpc_slice slice = ep->read_slices->slices[0]; + size_t len = GRPC_SLICE_LENGTH(slice); + CFIndex read_size = + CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len); + if (read_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream); + if (stream_error != nullptr) { + error = CFStreamAnnotateError( + GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep); + CFRelease(stream_error); + } else { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error"); + } + CallReadCb(ep, error); + EP_UNREF(ep, "read"); + } else if (read_size == 0) { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CallReadCb(ep, + CFStreamAnnotateError( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep)); + EP_UNREF(ep, "read"); + } else { + if (read_size < len) { + grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr); + } + CallReadCb(ep, GRPC_ERROR_NONE); + EP_UNREF(ep, "read"); + } +} + +static void WriteAction(void* arg, grpc_error* error) { + CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg); + GPR_ASSERT(ep->write_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); + CallWriteCb(ep, GRPC_ERROR_REF(error)); + EP_UNREF(ep, "write"); + return; + } + + grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices); + size_t slice_len = GRPC_SLICE_LENGTH(slice); + CFIndex write_size = CFWriteStreamWrite( + ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); + if (write_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); + CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream); + if (stream_error != nullptr) { + error = CFStreamAnnotateError( + GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep); + CFRelease(stream_error); + } else { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed."); + } + CallWriteCb(ep, error); + EP_UNREF(ep, "write"); + } else { + if (write_size < GRPC_SLICE_LENGTH(slice)) { + grpc_slice_buffer_undo_take_first( + ep->write_slices, grpc_slice_sub(slice, write_size, slice_len)); + } + if (ep->write_slices->length > 0) { + ep->stream_sync->NotifyOnWrite(&ep->write_action); + } else { + CallWriteCb(ep, GRPC_ERROR_NONE); + EP_UNREF(ep, "write"); + } + + if (grpc_tcp_trace.enabled()) { + grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size); + char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump); + gpr_free(dump); + grpc_slice_unref_internal(trace_slice); + } + } + grpc_slice_unref_internal(slice); +} + +static void CFStreamReadAllocationDone(void* arg, grpc_error* error) { + CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg); + if (error == GRPC_ERROR_NONE) { + ep->stream_sync->NotifyOnRead(&ep->read_action); + } else { + grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); + CallReadCb(ep, error); + EP_UNREF(ep, "read"); + } +} + +static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl, + slices, cb, slices->length); + } + GPR_ASSERT(ep_impl->read_cb == nullptr); + ep_impl->read_cb = cb; + ep_impl->read_slices = slices; + grpc_slice_buffer_reset_and_unref_internal(slices); + grpc_resource_user_alloc_slices(&ep_impl->slice_allocator, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, + ep_impl->read_slices); + EP_REF(ep_impl, "read"); +} + +static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu", + ep_impl, slices, cb, slices->length); + } + GPR_ASSERT(ep_impl->write_cb == nullptr); + ep_impl->write_cb = cb; + ep_impl->write_slices = slices; + EP_REF(ep_impl, "write"); + ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action); +} + +void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why); + } + CFReadStreamClose(ep_impl->read_stream); + CFWriteStreamClose(ep_impl->write_stream); + ep_impl->stream_sync->Shutdown(why); + grpc_resource_user_shutdown(ep_impl->resource_user); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why); + } +} + +void CFStreamDestroy(grpc_endpoint* ep) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl); + } + EP_UNREF(ep_impl, "destroy"); +} + +grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + return ep_impl->resource_user; +} + +char* CFStreamGetPeer(grpc_endpoint* ep) { + CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep); + return gpr_strdup(ep_impl->peer_string); +} + +int CFStreamGetFD(grpc_endpoint* ep) { return 0; } + +void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} +void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} +void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep, + grpc_pollset_set* pollset) {} + +static const grpc_endpoint_vtable vtable = {CFStreamRead, + CFStreamWrite, + CFStreamAddToPollset, + CFStreamAddToPollsetSet, + CFStreamDeleteFromPollsetSet, + CFStreamShutdown, + CFStreamDestroy, + CFStreamGetResourceUser, + CFStreamGetPeer, + CFStreamGetFD}; + +grpc_endpoint* grpc_cfstream_endpoint_create( + CFReadStreamRef read_stream, CFWriteStreamRef write_stream, + const char* peer_string, grpc_resource_quota* resource_quota, + CFStreamHandle* stream_sync) { + CFStreamEndpoint* ep_impl = + static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint))); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, + "CFStream endpoint:%p create readStream:%p writeStream: %p", + ep_impl, read_stream, write_stream); + } + ep_impl->base.vtable = &vtable; + gpr_ref_init(&ep_impl->refcount, 1); + ep_impl->read_stream = read_stream; + ep_impl->write_stream = write_stream; + CFRetain(read_stream); + CFRetain(write_stream); + ep_impl->stream_sync = stream_sync; + CFSTREAM_SYNC_REF(ep_impl->stream_sync, "endpoint create"); + + ep_impl->peer_string = gpr_strdup(peer_string); + ep_impl->read_cb = nil; + ep_impl->write_cb = nil; + ep_impl->read_slices = nil; + ep_impl->write_slices = nil; + GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction, + static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction, + static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx); + ep_impl->resource_user = + grpc_resource_user_create(resource_quota, peer_string); + grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator, + ep_impl->resource_user, + CFStreamReadAllocationDone, ep_impl); + + return &ep_impl->base; +} + +#endif /* GRPC_CFSTREAM_ENDPOINT */ diff --git a/src/core/lib/iomgr/tcp_cfstream.h b/src/core/lib/iomgr/endpoint_cfstream.h index 9f52f6a5de..ef957c1f11 100644 --- a/src/core/lib/iomgr/tcp_cfstream.h +++ b/src/core/lib/iomgr/endpoint_cfstream.h @@ -16,8 +16,8 @@ * */ -#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H -#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H +#ifndef GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H +#define GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H /* Low level TCP "bottom half" implementation, for use by transports built on top of a TCP connection. @@ -36,17 +36,14 @@ #import <CoreFoundation/CoreFoundation.h> #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/cfstream_handle.h" #include "src/core/lib/iomgr/endpoint.h" -#include "src/core/lib/iomgr/tcp_cfstream_sync.h" -extern grpc_core::TraceFlag grpc_tcp_trace; - -grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, - CFWriteStreamRef write_stream, - const char* peer_string, - grpc_resource_quota* resource_quota, - CFStreamSync* stream_sync); +grpc_endpoint* grpc_cfstream_endpoint_create( + CFReadStreamRef read_stream, CFWriteStreamRef write_stream, + const char* peer_string, grpc_resource_quota* resource_quota, + CFStreamHandle* stream_sync); #endif /* GRPC_CFSTREAM */ -#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H */ +#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H */ diff --git a/src/core/lib/iomgr/error_apple.cc b/src/core/lib/iomgr/error_apple.cc index 95d69ecee9..d7af8c377f 100644 --- a/src/core/lib/iomgr/error_apple.cc +++ b/src/core/lib/iomgr/error_apple.cc @@ -31,7 +31,8 @@ grpc_error* grpc_error_create_from_cferror(const char* file, int line, void* arg, const char* custom_desc) { CFErrorRef error = static_cast<CFErrorRef>(arg); - char buf_domain[MAX_ERROR_DESCRIPTION], buf_desc[MAX_ERROR_DESCRIPTION]; + char buf_domain[MAX_ERROR_DESCRIPTION]; + char buf_desc[MAX_ERROR_DESCRIPTION]; char* error_msg; CFErrorDomain domain = CFErrorGetDomain((error)); CFIndex code = CFErrorGetCode((error)); diff --git a/src/core/lib/iomgr/polling_entity.cc b/src/core/lib/iomgr/polling_entity.cc index 9c21e1f488..1ca3d9f31f 100644 --- a/src/core/lib/iomgr/polling_entity.cc +++ b/src/core/lib/iomgr/polling_entity.cc @@ -62,6 +62,8 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent, grpc_pollset_set* pss_dst) { if (pollent->tag == GRPC_POLLS_POLLSET) { #ifdef GRPC_CFSTREAM + // CFStream does not use file destriptors. When CFStream is used, the fd + // pollset is possible to be null. if (pollent->pollent.pollset != nullptr) { grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset); } diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index a1a029e1bf..80d8e63cdd 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -99,8 +99,8 @@ #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #ifdef GRPC_CFSTREAM #define GRPC_POSIX_SOCKET_IOMGR 1 -#define GRPC_CFSTREAM_TCP 1 -#define GRPC_CFSTREAM_TCP_CLIENT 1 +#define GRPC_CFSTREAM_ENDPOINT 1 +#define GRPC_CFSTREAM_CLIENT 1 #define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1 #define GRPC_POSIX_SOCKET_EV 1 #define GRPC_POSIX_SOCKET_EV_EPOLL1 1 diff --git a/src/core/lib/iomgr/tcp_cfstream.cc b/src/core/lib/iomgr/tcp_cfstream.cc deleted file mode 100644 index d232e78513..0000000000 --- a/src/core/lib/iomgr/tcp_cfstream.cc +++ /dev/null @@ -1,368 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include "src/core/lib/iomgr/port.h" - -#ifdef GRPC_CFSTREAM_TCP - -#import <CoreFoundation/CoreFoundation.h> -#import "src/core/lib/iomgr/tcp_cfstream.h" - -#include <grpc/slice_buffer.h> -#include <grpc/support/alloc.h> -#include <grpc/support/string_util.h> - -#include "src/core/lib/gpr/string.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/endpoint.h" -#include "src/core/lib/iomgr/error_apple.h" -#include "src/core/lib/iomgr/tcp_cfstream_sync.h" -#include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/slice/slice_string_helpers.h" - -extern grpc_core::TraceFlag grpc_tcp_trace; - -typedef struct { - grpc_endpoint base; - gpr_refcount refcount; - - CFReadStreamRef read_stream; - CFWriteStreamRef write_stream; - CFStreamSync* stream_sync; - - grpc_closure* read_cb; - grpc_closure* write_cb; - grpc_slice_buffer* read_slices; - grpc_slice_buffer* write_slices; - - grpc_closure read_action; - grpc_closure write_action; - CFStreamEventType read_type; - - char* peer_string; - grpc_resource_user* resource_user; - grpc_resource_user_slice_allocator slice_allocator; -} CFStreamTCP; - -static void TCPFree(CFStreamTCP* tcp) { - grpc_resource_user_unref(tcp->resource_user); - CFRelease(tcp->read_stream); - CFRelease(tcp->write_stream); - CFSTREAM_SYNC_UNREF(tcp->stream_sync, "free"); - gpr_free(tcp->peer_string); - gpr_free(tcp); -} - -#ifndef NDEBUG -#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file, - int line) { - if (grpc_tcp_trace.enabled()) { - gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, - val - 1); - } - if (gpr_unref(&tcp->refcount)) { - TCPFree(tcp); - } -} -static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file, - int line) { - if (grpc_tcp_trace.enabled()) { - gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, - val + 1); - } - gpr_ref(&tcp->refcount); -} -#else -#define TCP_REF(tcp, reason) tcp_ref((tcp)) -#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) -static void tcp_unref(CFStreamTCP* tcp) { - if (gpr_unref(&tcp->refcount)) { - TCPFree(tcp); - } -} -static void tcp_ref(CFStreamTCP* tcp) { gpr_ref(&tcp->refcount); } -#endif - -static grpc_error* TCPAnnotateError(grpc_error* src_error, CFStreamTCP* tcp) { - return grpc_error_set_str( - grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE), - GRPC_ERROR_STR_TARGET_ADDRESS, - grpc_slice_from_copied_string(tcp->peer_string)); -} - -static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) { - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb, - tcp->read_cb->cb, tcp->read_cb->cb_arg); - size_t i; - const char* str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "read: error=%s", str); - - for (i = 0; i < tcp->read_slices->count; i++) { - char* dump = grpc_dump_slice(tcp->read_slices->slices[i], - GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); - gpr_free(dump); - } - } - grpc_closure* cb = tcp->read_cb; - tcp->read_cb = nullptr; - tcp->read_slices = nullptr; - GRPC_CLOSURE_SCHED(cb, error); -} - -static void CallWriteCB(CFStreamTCP* tcp, grpc_error* error) { - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb, - tcp->write_cb->cb, tcp->write_cb->cb_arg); - const char* str = grpc_error_string(error); - gpr_log(GPR_DEBUG, "write: error=%s", str); - } - grpc_closure* cb = tcp->write_cb; - tcp->write_cb = nullptr; - tcp->write_slices = nullptr; - GRPC_CLOSURE_SCHED(cb, error); -} - -static void ReadAction(void* arg, grpc_error* error) { - CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg); - GPR_ASSERT(tcp->read_cb != nullptr); - if (error) { - grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); - CallReadCB(tcp, GRPC_ERROR_REF(error)); - TCP_UNREF(tcp, "read"); - return; - } - - GPR_ASSERT(tcp->read_slices->count == 1); - grpc_slice slice = tcp->read_slices->slices[0]; - size_t len = GRPC_SLICE_LENGTH(slice); - CFIndex read_size = - CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len); - if (read_size == -1) { - grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); - CFErrorRef stream_error = CFReadStreamCopyError(tcp->read_stream); - if (stream_error != nullptr) { - error = TCPAnnotateError( - GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), tcp); - CFRelease(stream_error); - } else { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error"); - } - CallReadCB(tcp, error); - TCP_UNREF(tcp, "read"); - } else if (read_size == 0) { - grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); - CallReadCB(tcp, - TCPAnnotateError( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); - TCP_UNREF(tcp, "read"); - } else { - if (read_size < len) { - grpc_slice_buffer_trim_end(tcp->read_slices, len - read_size, nullptr); - } - CallReadCB(tcp, GRPC_ERROR_NONE); - TCP_UNREF(tcp, "read"); - } -} - -static void WriteAction(void* arg, grpc_error* error) { - CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg); - GPR_ASSERT(tcp->write_cb != nullptr); - if (error) { - grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices); - CallWriteCB(tcp, GRPC_ERROR_REF(error)); - TCP_UNREF(tcp, "write"); - return; - } - - grpc_slice slice = grpc_slice_buffer_take_first(tcp->write_slices); - size_t slice_len = GRPC_SLICE_LENGTH(slice); - CFIndex write_size = CFWriteStreamWrite( - tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); - if (write_size == -1) { - grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices); - CFErrorRef stream_error = CFWriteStreamCopyError(tcp->write_stream); - if (stream_error != nullptr) { - error = TCPAnnotateError( - GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), tcp); - CFRelease(stream_error); - } else { - error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed."); - } - CallWriteCB(tcp, error); - TCP_UNREF(tcp, "write"); - } else { - if (write_size < GRPC_SLICE_LENGTH(slice)) { - grpc_slice_buffer_undo_take_first( - tcp->write_slices, grpc_slice_sub(slice, write_size, slice_len)); - } - if (tcp->write_slices->length > 0) { - tcp->stream_sync->NotifyOnWrite(&tcp->write_action); - } else { - CallWriteCB(tcp, GRPC_ERROR_NONE); - TCP_UNREF(tcp, "write"); - } - - if (grpc_tcp_trace.enabled()) { - grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size); - char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, dump); - gpr_free(dump); - grpc_slice_unref_internal(trace_slice); - } - } - grpc_slice_unref_internal(slice); -} - -static void TCPReadAllocationDone(void* arg, grpc_error* error) { - CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg); - if (error == GRPC_ERROR_NONE) { - tcp->stream_sync->NotifyOnRead(&tcp->read_action); - } else { - grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); - CallReadCB(tcp, error); - TCP_UNREF(tcp, "read"); - } -} - -static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { - CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb, - slices->length); - } - GPR_ASSERT(tcp->read_cb == nullptr); - tcp->read_cb = cb; - tcp->read_slices = slices; - grpc_slice_buffer_reset_and_unref_internal(slices); - grpc_resource_user_alloc_slices(&tcp->slice_allocator, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, - tcp->read_slices); - TCP_REF(tcp, "read"); -} - -static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, - grpc_closure* cb) { - CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb, - slices->length); - } - GPR_ASSERT(tcp->write_cb == nullptr); - tcp->write_cb = cb; - tcp->write_slices = slices; - TCP_REF(tcp, "write"); - tcp->stream_sync->NotifyOnWrite(&tcp->write_action); -} - -void TCPShutdown(grpc_endpoint* ep, grpc_error* why) { - CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "tcp:%p shutdown (%p)", tcp, why); - } - CFReadStreamClose(tcp->read_stream); - CFWriteStreamClose(tcp->write_stream); - tcp->stream_sync->Shutdown(why); - grpc_resource_user_shutdown(tcp->resource_user); - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "tcp:%p shutdown DONE (%p)", tcp, why); - } -} - -void TCPDestroy(grpc_endpoint* ep) { - CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "tcp:%p destroy", tcp); - } - TCP_UNREF(tcp, "destroy"); -} - -grpc_resource_user* TCPGetResourceUser(grpc_endpoint* ep) { - CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); - return tcp->resource_user; -} - -char* TCPGetPeer(grpc_endpoint* ep) { - CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); - return gpr_strdup(tcp->peer_string); -} - -int TCPGetFD(grpc_endpoint* ep) { return 0; } - -void TCPAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} -void TCPAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} -void TCPDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} - -static const grpc_endpoint_vtable vtable = {TCPRead, - TCPWrite, - TCPAddToPollset, - TCPAddToPollsetSet, - TCPDeleteFromPollsetSet, - TCPShutdown, - TCPDestroy, - TCPGetResourceUser, - TCPGetPeer, - TCPGetFD}; - -grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, - CFWriteStreamRef write_stream, - const char* peer_string, - grpc_resource_quota* resource_quota, - CFStreamSync* stream_sync) { - CFStreamTCP* tcp = static_cast<CFStreamTCP*>(gpr_malloc(sizeof(CFStreamTCP))); - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp, - read_stream, write_stream); - } - tcp->base.vtable = &vtable; - gpr_ref_init(&tcp->refcount, 1); - tcp->read_stream = read_stream; - tcp->write_stream = write_stream; - CFRetain(read_stream); - CFRetain(write_stream); - tcp->stream_sync = stream_sync; - CFSTREAM_SYNC_REF(tcp->stream_sync, "endpoint create"); - - tcp->peer_string = gpr_strdup(peer_string); - tcp->read_cb = nil; - tcp->write_cb = nil; - tcp->read_slices = nil; - tcp->write_slices = nil; - GRPC_CLOSURE_INIT(&tcp->read_action, ReadAction, static_cast<void*>(tcp), - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&tcp->write_action, WriteAction, static_cast<void*>(tcp), - grpc_schedule_on_exec_ctx); - tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); - grpc_resource_user_slice_allocator_init( - &tcp->slice_allocator, tcp->resource_user, TCPReadAllocationDone, tcp); - - return &tcp->base; -} - -#endif /* GRPC_CFSTREAM_TCP */ diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.cc b/src/core/lib/iomgr/tcp_cfstream_sync.cc deleted file mode 100644 index 07ee53311c..0000000000 --- a/src/core/lib/iomgr/tcp_cfstream_sync.cc +++ /dev/null @@ -1,183 +0,0 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/support/port_platform.h> - -#include "src/core/lib/iomgr/port.h" - -#ifdef GRPC_CFSTREAM -#import <CoreFoundation/CoreFoundation.h> -#import "src/core/lib/iomgr/tcp_cfstream_sync.h" - -#include <grpc/support/atm.h> -#include <grpc/support/sync.h> - -#include "src/core/lib/debug/trace.h" -#include "src/core/lib/iomgr/closure.h" -#include "src/core/lib/iomgr/exec_ctx.h" - -extern grpc_core::TraceFlag grpc_tcp_trace; - -void* CFStreamSync::Retain(void* info) { - CFStreamSync* sync = static_cast<CFStreamSync*>(info); - CFSTREAM_SYNC_REF(sync, "retain"); - return info; -} - -void CFStreamSync::Release(void* info) { - CFStreamSync* sync = static_cast<CFStreamSync*>(info); - CFSTREAM_SYNC_UNREF(sync, "release"); -} - -CFStreamSync* CFStreamSync::CreateStreamSync(CFReadStreamRef read_stream, - CFWriteStreamRef write_stream) { - return new CFStreamSync(read_stream, write_stream); -} - -void CFStreamSync::ReadCallback(CFReadStreamRef stream, CFStreamEventType type, - void* client_callback_info) { - CFStreamSync* sync = static_cast<CFStreamSync*>(client_callback_info); - CFSTREAM_SYNC_REF(sync, "read callback"); - dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), - ^{ - grpc_core::ExecCtx exec_ctx; - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "TCP ReadCallback (%p, %p, %lu, %p)", - sync, stream, type, client_callback_info); - } - switch (type) { - case kCFStreamEventOpenCompleted: - sync->open_event_.SetReady(); - break; - case kCFStreamEventHasBytesAvailable: - case kCFStreamEventEndEncountered: - sync->read_event_.SetReady(); - break; - case kCFStreamEventErrorOccurred: - sync->open_event_.SetReady(); - sync->read_event_.SetReady(); - break; - default: - // Impossible - abort(); - } - CFSTREAM_SYNC_UNREF(sync, "read callback"); - }); -} -void CFStreamSync::WriteCallback(CFWriteStreamRef stream, - CFStreamEventType type, - void* clientCallBackInfo) { - CFStreamSync* sync = static_cast<CFStreamSync*>(clientCallBackInfo); - CFSTREAM_SYNC_REF(sync, "write callback"); - dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), - ^{ - grpc_core::ExecCtx exec_ctx; - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "TCP WriteCallback (%p, %p, %lu, %p)", - sync, stream, type, clientCallBackInfo); - } - switch (type) { - case kCFStreamEventOpenCompleted: - sync->open_event_.SetReady(); - break; - case kCFStreamEventCanAcceptBytes: - case kCFStreamEventEndEncountered: - sync->write_event_.SetReady(); - break; - case kCFStreamEventErrorOccurred: - sync->open_event_.SetReady(); - sync->write_event_.SetReady(); - break; - default: - // Impossible - abort(); - } - CFSTREAM_SYNC_UNREF(sync, "write callback"); - }); -} - -CFStreamSync::CFStreamSync(CFReadStreamRef read_stream, - CFWriteStreamRef write_stream) { - gpr_ref_init(&refcount_, 1); - open_event_.InitEvent(); - read_event_.InitEvent(); - write_event_.InitEvent(); - CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil}; - CFReadStreamSetClient( - read_stream, - kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | - kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, - CFStreamSync::ReadCallback, &ctx); - CFWriteStreamSetClient( - write_stream, - kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | - kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, - CFStreamSync::WriteCallback, &ctx); - CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), - kCFRunLoopCommonModes); - CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), - kCFRunLoopCommonModes); -} - -CFStreamSync::~CFStreamSync() { - open_event_.DestroyEvent(); - read_event_.DestroyEvent(); - write_event_.DestroyEvent(); -} - -void CFStreamSync::NotifyOnOpen(grpc_closure* closure) { - open_event_.NotifyOn(closure); -} - -void CFStreamSync::NotifyOnRead(grpc_closure* closure) { - read_event_.NotifyOn(closure); -} - -void CFStreamSync::NotifyOnWrite(grpc_closure* closure) { - write_event_.NotifyOn(closure); -} - -void CFStreamSync::Shutdown(grpc_error* error) { - open_event_.SetShutdown(GRPC_ERROR_REF(error)); - read_event_.SetShutdown(GRPC_ERROR_REF(error)); - write_event_.SetShutdown(GRPC_ERROR_REF(error)); - GRPC_ERROR_UNREF(error); -} - -void CFStreamSync::Ref(const char* file, int line, const char* reason) { - if (grpc_tcp_trace.enabled()) { - gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "TCP SYNC ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val, - val + 1); - } - gpr_ref(&refcount_); -} - -void CFStreamSync::Unref(const char* file, int line, const char* reason) { - if (grpc_tcp_trace.enabled()) { - gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); - gpr_log(GPR_ERROR, "TCP SYNC unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, - reason, val, val - 1); - } - if (gpr_unref(&refcount_)) { - delete this; - } -} - -#endif diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc index 7deaece904..552f81a506 100644 --- a/src/core/lib/iomgr/tcp_client_cfstream.cc +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -21,7 +21,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_CFSTREAM_TCP_CLIENT +#ifdef GRPC_CFSTREAM_CLIENT #include <CoreFoundation/CoreFoundation.h> @@ -35,24 +35,24 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/host_port.h" +#include "src/core/lib/iomgr/cfstream_handle.h" #include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint_cfstream.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/error_apple.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/iomgr/tcp_cfstream.h" -#include "src/core/lib/iomgr/tcp_cfstream_sync.h" #include "src/core/lib/iomgr/tcp_client.h" #include "src/core/lib/iomgr/timer.h" extern grpc_core::TraceFlag grpc_tcp_trace; -typedef struct CFStreamTCPConnect { +typedef struct CFStreamConnect { gpr_mu mu; gpr_refcount refcount; CFReadStreamRef read_stream; CFWriteStreamRef write_stream; - CFStreamSync* stream_sync; + CFStreamHandle* stream_sync; grpc_timer alarm; grpc_closure on_alarm; @@ -67,9 +67,9 @@ typedef struct CFStreamTCPConnect { int refs; char* addr_name; grpc_resource_quota* resource_quota; -} CFStreamTCPConnect; +} CFStreamConnect; -static void TCPConnectCleanup(CFStreamTCPConnect* connect) { +static void CFStreamConnectCleanup(CFStreamConnect* connect) { grpc_resource_quota_unref_internal(connect->resource_quota); CFSTREAM_SYNC_UNREF(connect->stream_sync, "async connect clean up"); CFRelease(connect->read_stream); @@ -80,7 +80,7 @@ static void TCPConnectCleanup(CFStreamTCPConnect* connect) { } static void OnAlarm(void* arg, grpc_error* error) { - CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg); + CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error); } @@ -92,7 +92,7 @@ static void OnAlarm(void* arg, grpc_error* error) { // Only schedule a callback once, by either on_timer or on_connected. The // first one issues callback while the second one does cleanup. if (done) { - TCPConnectCleanup(connect); + CFStreamConnectCleanup(connect); } else { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); @@ -101,7 +101,7 @@ static void OnAlarm(void* arg, grpc_error* error) { } static void OnOpen(void* arg, grpc_error* error) { - CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg); + CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error); } @@ -115,7 +115,7 @@ static void OnOpen(void* arg, grpc_error* error) { if (done) { gpr_mu_unlock(&connect->mu); - TCPConnectCleanup(connect); + CFStreamConnectCleanup(connect); } else { if (error == GRPC_ERROR_NONE) { CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream); @@ -127,9 +127,9 @@ static void OnOpen(void* arg, grpc_error* error) { CFRelease(stream_error); } if (error == GRPC_ERROR_NONE) { - *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream, - connect->addr_name, connect->resource_quota, - connect->stream_sync); + *endpoint = grpc_cfstream_endpoint_create( + connect->read_stream, connect->write_stream, connect->addr_name, + connect->resource_quota, connect->stream_sync); } } gpr_mu_unlock(&connect->mu); @@ -149,14 +149,14 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr, *port = grpc_sockaddr_get_port(addr); } -static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, - grpc_pollset_set* interested_parties, - const grpc_channel_args* channel_args, - const grpc_resolved_address* resolved_addr, - grpc_millis deadline) { - CFStreamTCPConnect* connect; +static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* resolved_addr, + grpc_millis deadline) { + CFStreamConnect* connect; - connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect)); + connect = (CFStreamConnect*)gpr_zalloc(sizeof(CFStreamConnect)); connect->closure = closure; connect->endpoint = ep; connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); @@ -194,7 +194,7 @@ static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, connect->read_stream = read_stream; connect->write_stream = write_stream; connect->stream_sync = - CFStreamSync::CreateStreamSync(read_stream, write_stream); + CFStreamHandle::CreateStreamSync(read_stream, write_stream); GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect), grpc_schedule_on_exec_ctx); connect->stream_sync->NotifyOnOpen(&connect->on_open); @@ -207,6 +207,6 @@ static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, gpr_mu_unlock(&connect->mu); } -grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {TCPClientConnect}; +grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {CFStreamClientConnect}; -#endif /* GRPC_CFSTREAM_TCP_CLIENT */ +#endif /* GRPC_CFSTREAM_CLIENT */ diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 9b633efc57..c9f5def40d 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -185,7 +185,8 @@ void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, transport->vtable->set_pollset_set(transport, stream, pollset_set); } else { #ifdef GRPC_CFSTREAM - // No-op for empty pollset + // No-op for empty pollset. CFStream does not use file destriptors. When + // CFStream is used, the fd pollset is possible to be null. #else abort(); #endif diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 6dee3b81f1..92864b2559 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -82,8 +82,10 @@ CORE_SOURCE_FILES = [ 'src/core/lib/http/httpcli.cc', 'src/core/lib/http/parser.cc', 'src/core/lib/iomgr/call_combiner.cc', + 'src/core/lib/iomgr/cfstream_handle.cc', 'src/core/lib/iomgr/combiner.cc', 'src/core/lib/iomgr/endpoint.cc', + 'src/core/lib/iomgr/endpoint_cfstream.cc', 'src/core/lib/iomgr/endpoint_pair_posix.cc', 'src/core/lib/iomgr/endpoint_pair_uv.cc', 'src/core/lib/iomgr/endpoint_pair_windows.cc', @@ -135,8 +137,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/iomgr/socket_utils_uv.cc', 'src/core/lib/iomgr/socket_utils_windows.cc', 'src/core/lib/iomgr/socket_windows.cc', - 'src/core/lib/iomgr/tcp_cfstream.cc', - 'src/core/lib/iomgr/tcp_cfstream_sync.cc', 'src/core/lib/iomgr/tcp_client.cc', 'src/core/lib/iomgr/tcp_client_cfstream.cc', 'src/core/lib/iomgr/tcp_client_custom.cc', diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index e593c9d736..cb1c75b988 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1061,9 +1061,11 @@ src/core/lib/http/httpcli.h \ src/core/lib/http/parser.h \ src/core/lib/iomgr/block_annotate.h \ src/core/lib/iomgr/call_combiner.h \ +src/core/lib/iomgr/cfstream_handle.h \ src/core/lib/iomgr/closure.h \ src/core/lib/iomgr/combiner.h \ src/core/lib/iomgr/endpoint.h \ +src/core/lib/iomgr/endpoint_cfstream.h \ src/core/lib/iomgr/endpoint_pair.h \ src/core/lib/iomgr/error.h \ src/core/lib/iomgr/error_apple.h \ @@ -1108,8 +1110,6 @@ src/core/lib/iomgr/socket_utils.h \ src/core/lib/iomgr/socket_utils_posix.h \ src/core/lib/iomgr/socket_windows.h \ src/core/lib/iomgr/sys_epoll_wrapper.h \ -src/core/lib/iomgr/tcp_cfstream.h \ -src/core/lib/iomgr/tcp_cfstream_sync.h \ src/core/lib/iomgr/tcp_client.h \ src/core/lib/iomgr/tcp_client_posix.h \ src/core/lib/iomgr/tcp_custom.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index 4c68e33d43..be32966025 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1147,11 +1147,15 @@ src/core/lib/iomgr/README.md \ src/core/lib/iomgr/block_annotate.h \ src/core/lib/iomgr/call_combiner.cc \ src/core/lib/iomgr/call_combiner.h \ +src/core/lib/iomgr/cfstream_handle.cc \ +src/core/lib/iomgr/cfstream_handle.h \ src/core/lib/iomgr/closure.h \ src/core/lib/iomgr/combiner.cc \ src/core/lib/iomgr/combiner.h \ src/core/lib/iomgr/endpoint.cc \ src/core/lib/iomgr/endpoint.h \ +src/core/lib/iomgr/endpoint_cfstream.cc \ +src/core/lib/iomgr/endpoint_cfstream.h \ src/core/lib/iomgr/endpoint_pair.h \ src/core/lib/iomgr/endpoint_pair_posix.cc \ src/core/lib/iomgr/endpoint_pair_uv.cc \ @@ -1247,10 +1251,6 @@ src/core/lib/iomgr/socket_utils_windows.cc \ src/core/lib/iomgr/socket_windows.cc \ src/core/lib/iomgr/socket_windows.h \ src/core/lib/iomgr/sys_epoll_wrapper.h \ -src/core/lib/iomgr/tcp_cfstream.cc \ -src/core/lib/iomgr/tcp_cfstream.h \ -src/core/lib/iomgr/tcp_cfstream_sync.cc \ -src/core/lib/iomgr/tcp_cfstream_sync.h \ src/core/lib/iomgr/tcp_client.cc \ src/core/lib/iomgr/tcp_client.h \ src/core/lib/iomgr/tcp_client_cfstream.cc \ diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 3de860c3c7..adb36bb182 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -9280,8 +9280,10 @@ "src/core/lib/http/httpcli.cc", "src/core/lib/http/parser.cc", "src/core/lib/iomgr/call_combiner.cc", + "src/core/lib/iomgr/cfstream_handle.cc", "src/core/lib/iomgr/combiner.cc", "src/core/lib/iomgr/endpoint.cc", + "src/core/lib/iomgr/endpoint_cfstream.cc", "src/core/lib/iomgr/endpoint_pair_posix.cc", "src/core/lib/iomgr/endpoint_pair_uv.cc", "src/core/lib/iomgr/endpoint_pair_windows.cc", @@ -9333,8 +9335,6 @@ "src/core/lib/iomgr/socket_utils_uv.cc", "src/core/lib/iomgr/socket_utils_windows.cc", "src/core/lib/iomgr/socket_windows.cc", - "src/core/lib/iomgr/tcp_cfstream.cc", - "src/core/lib/iomgr/tcp_cfstream_sync.cc", "src/core/lib/iomgr/tcp_client.cc", "src/core/lib/iomgr/tcp_client_cfstream.cc", "src/core/lib/iomgr/tcp_client_custom.cc", @@ -9462,9 +9462,11 @@ "src/core/lib/http/parser.h", "src/core/lib/iomgr/block_annotate.h", "src/core/lib/iomgr/call_combiner.h", + "src/core/lib/iomgr/cfstream_handle.h", "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/combiner.h", "src/core/lib/iomgr/endpoint.h", + "src/core/lib/iomgr/endpoint_cfstream.h", "src/core/lib/iomgr/endpoint_pair.h", "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/error_apple.h", @@ -9509,8 +9511,6 @@ "src/core/lib/iomgr/socket_utils_posix.h", "src/core/lib/iomgr/socket_windows.h", "src/core/lib/iomgr/sys_epoll_wrapper.h", - "src/core/lib/iomgr/tcp_cfstream.h", - "src/core/lib/iomgr/tcp_cfstream_sync.h", "src/core/lib/iomgr/tcp_client.h", "src/core/lib/iomgr/tcp_client_posix.h", "src/core/lib/iomgr/tcp_custom.h", @@ -9614,9 +9614,11 @@ "src/core/lib/http/parser.h", "src/core/lib/iomgr/block_annotate.h", "src/core/lib/iomgr/call_combiner.h", + "src/core/lib/iomgr/cfstream_handle.h", "src/core/lib/iomgr/closure.h", "src/core/lib/iomgr/combiner.h", "src/core/lib/iomgr/endpoint.h", + "src/core/lib/iomgr/endpoint_cfstream.h", "src/core/lib/iomgr/endpoint_pair.h", "src/core/lib/iomgr/error.h", "src/core/lib/iomgr/error_apple.h", @@ -9661,8 +9663,6 @@ "src/core/lib/iomgr/socket_utils_posix.h", "src/core/lib/iomgr/socket_windows.h", "src/core/lib/iomgr/sys_epoll_wrapper.h", - "src/core/lib/iomgr/tcp_cfstream.h", - "src/core/lib/iomgr/tcp_cfstream_sync.h", "src/core/lib/iomgr/tcp_client.h", "src/core/lib/iomgr/tcp_client_posix.h", "src/core/lib/iomgr/tcp_custom.h", |