aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc4
-rw-r--r--src/core/lib/iomgr/error_apple.cc51
-rw-r--r--src/core/lib/iomgr/error_apple.h31
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc4
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc4
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_posix.cc4
-rw-r--r--src/core/lib/iomgr/iomgr_posix.cc4
-rw-r--r--src/core/lib/iomgr/polling_entity.cc12
-rw-r--r--src/core/lib/iomgr/port.h39
-rw-r--r--src/core/lib/iomgr/resolve_address.h4
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.cc2
-rw-r--r--src/core/lib/iomgr/sockaddr_posix.h2
-rw-r--r--src/core/lib/iomgr/socket_factory_posix.cc2
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_cfstream.cc357
-rw-r--r--src/core/lib/iomgr/tcp_cfstream.h52
-rw-r--r--src/core/lib/iomgr/tcp_cfstream_sync.cc183
-rw-r--r--src/core/lib/iomgr/tcp_cfstream_sync.h79
-rw-r--r--src/core/lib/iomgr/tcp_client_cfstream.cc212
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc4
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc4
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc4
-rw-r--r--src/core/lib/slice/slice_buffer.cc18
-rw-r--r--src/core/lib/transport/transport.cc4
27 files changed, 1062 insertions, 28 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
index b604f2bf14..ebe2c4c41c 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc
@@ -18,7 +18,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
-#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET)
+#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
#include <ares.h>
#include <sys/ioctl.h>
@@ -348,4 +348,4 @@ void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) {
gpr_mu_unlock(&ev_driver->mu);
}
-#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) */
+#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */
diff --git a/src/core/lib/iomgr/error_apple.cc b/src/core/lib/iomgr/error_apple.cc
new file mode 100644
index 0000000000..95d69ecee9
--- /dev/null
+++ b/src/core/lib/iomgr/error_apple.cc
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GRPC_CFSTREAM
+#include <CoreFoundation/CoreFoundation.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/error.h"
+
+#define MAX_ERROR_DESCRIPTION 256
+
+grpc_error* grpc_error_create_from_cferror(const char* file, int line,
+ void* arg, const char* custom_desc) {
+ CFErrorRef error = static_cast<CFErrorRef>(arg);
+ char buf_domain[MAX_ERROR_DESCRIPTION], buf_desc[MAX_ERROR_DESCRIPTION];
+ char* error_msg;
+ CFErrorDomain domain = CFErrorGetDomain((error));
+ CFIndex code = CFErrorGetCode((error));
+ CFStringRef desc = CFErrorCopyDescription((error));
+ CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION,
+ kCFStringEncodingUTF8);
+ CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION,
+ kCFStringEncodingUTF8);
+ gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)",
+ custom_desc, buf_domain, code, buf_desc);
+ CFRelease(desc);
+ grpc_error* return_error = grpc_error_create(
+ file, line, grpc_slice_from_copied_string(error_msg), NULL, 0);
+ gpr_free(error_msg);
+ return return_error;
+}
+#endif /* GRPC_CFSTREAM */
diff --git a/src/core/lib/iomgr/error_apple.h b/src/core/lib/iomgr/error_apple.h
new file mode 100644
index 0000000000..276ec77d35
--- /dev/null
+++ b/src/core/lib/iomgr/error_apple.h
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_ERROR_APPLE_H
+#define GRPC_CORE_LIB_IOMGR_ERROR_APPLE_H
+
+#ifdef GRPC_CFSTREAM
+// Create an error from Apple Core Foundation CFError object
+#define GRPC_ERROR_CREATE_FROM_CFERROR(error, desc) \
+ grpc_error_create_from_cferror(__FILE__, __LINE__, \
+ static_cast<void*>((error)), (desc))
+grpc_error* grpc_error_create_from_cferror(const char* file, int line,
+ void* arg, const char* desc);
+#endif /* GRPC_CFSTREAM */
+
+#endif /* GRPC_CORE_LIB_IOMGR_ERROR_APPLE_H */
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index e5db1be0e0..cf839619cd 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1237,12 +1237,12 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
}
#else /* defined(GRPC_LINUX_EPOLL) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
#include "src/core/lib/iomgr/ev_epoll1_linux.h"
/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
* NULL */
const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
return nullptr;
}
-#endif /* defined(GRPC_POSIX_SOCKET) */
+#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
#endif /* !defined(GRPC_LINUX_EPOLL) */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 4c6cff7fe2..c86ed1e6c9 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1556,7 +1556,7 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
}
#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLLEX)
#include "src/core/lib/iomgr/ev_epollex_linux.h"
/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
epoll_create1 is not available. Return NULL */
@@ -1564,6 +1564,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
return nullptr;
}
-#endif /* defined(GRPC_POSIX_SOCKET) */
+#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) */
#endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index 494bc71c1d..a144817a83 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -1721,7 +1721,7 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(
}
#else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
-#if defined(GRPC_POSIX_SOCKET)
+#if defined(GRPC_POSIX_SOCKET_EV_EPOLLSIG)
#include "src/core/lib/iomgr/ev_epollsig_linux.h"
/* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
epoll_create1 is not available. Return NULL */
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 504787e659..70958ed562 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_EV_POLL
#include "src/core/lib/iomgr/ev_poll_posix.h"
@@ -1761,4 +1761,4 @@ const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request) {
return &vtable;
}
-#endif
+#endif /* GRPC_POSIX_SOCKET_EV_POLL */
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 6bd1dc8e50..dd3556ea40 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_EV
#include "src/core/lib/iomgr/ev_posix.h"
@@ -334,4 +334,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
g_event_engine->pollset_set_del_fd(pollset_set, fd);
}
-#endif // GRPC_POSIX_SOCKET
+#endif // GRPC_POSIX_SOCKET_EV
diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc
index 66c9cb7ff7..ca7334c9a4 100644
--- a/src/core/lib/iomgr/iomgr_posix.cc
+++ b/src/core/lib/iomgr/iomgr_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_IOMGR
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
@@ -64,4 +64,4 @@ void grpc_set_default_iomgr_platform() {
grpc_set_iomgr_platform_vtable(&vtable);
}
-#endif /* GRPC_POSIX_SOCKET */
+#endif /* GRPC_POSIX_SOCKET_IOMGR */
diff --git a/src/core/lib/iomgr/polling_entity.cc b/src/core/lib/iomgr/polling_entity.cc
index 9f164f65b0..9c21e1f488 100644
--- a/src/core/lib/iomgr/polling_entity.cc
+++ b/src/core/lib/iomgr/polling_entity.cc
@@ -61,8 +61,14 @@ bool grpc_polling_entity_is_empty(const grpc_polling_entity* pollent) {
void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent,
grpc_pollset_set* pss_dst) {
if (pollent->tag == GRPC_POLLS_POLLSET) {
+#ifdef GRPC_CFSTREAM
+ if (pollent->pollent.pollset != nullptr) {
+ grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset);
+ }
+#else
GPR_ASSERT(pollent->pollent.pollset != nullptr);
grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset);
+#endif
} else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
GPR_ASSERT(pollent->pollent.pollset_set != nullptr);
grpc_pollset_set_add_pollset_set(pss_dst, pollent->pollent.pollset_set);
@@ -75,8 +81,14 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent,
void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity* pollent,
grpc_pollset_set* pss_dst) {
if (pollent->tag == GRPC_POLLS_POLLSET) {
+#ifdef GRPC_CFSTREAM
+ if (pollent->pollent.pollset != nullptr) {
+ grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset);
+ }
+#else
GPR_ASSERT(pollent->pollent.pollset != nullptr);
grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset);
+#endif
} else if (pollent->tag == GRPC_POLLS_POLLSET_SET) {
GPR_ASSERT(pollent->pollent.pollset_set != nullptr);
grpc_pollset_set_del_pollset_set(pss_dst, pollent->pollent.pollset_set);
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index a397012003..a1a029e1bf 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -97,7 +97,26 @@
#define GRPC_MSG_IOVLEN_TYPE int
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
+#ifdef GRPC_CFSTREAM
+#define GRPC_POSIX_SOCKET_IOMGR 1
+#define GRPC_CFSTREAM_TCP 1
+#define GRPC_CFSTREAM_TCP_CLIENT 1
+#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
+#define GRPC_POSIX_SOCKET_EV 1
+#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1
+#define GRPC_POSIX_SOCKET_EV_POLL 1
+#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1
+#define GRPC_POSIX_SOCKET_SOCKADDR 1
+#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1
+#define GRPC_POSIX_SOCKET_TCP 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1
+#define GRPC_POSIX_SOCKET_UTILS_COMMON 1
+#else
#define GRPC_POSIX_SOCKET 1
+#endif
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_SYSCONF 1
#define GRPC_POSIX_WAKEUP_FD 1
@@ -131,12 +150,30 @@
#endif
#if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \
- defined(GRPC_CUSTOM_SOCKET) != \
+ defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_CFSTREAM) != \
1
#error \
"Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GRPC_CUSTOM_SOCKET"
#endif
+#ifdef GRPC_POSIX_SOCKET
+#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
+#define GRPC_POSIX_SOCKET_EV 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1
+#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1
+#define GRPC_POSIX_SOCKET_EV_POLL 1
+#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
+#define GRPC_POSIX_SOCKET_IOMGR 1
+#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1
+#define GRPC_POSIX_SOCKET_SOCKADDR 1
+#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1
+#define GRPC_POSIX_SOCKET_TCP 1
+#define GRPC_POSIX_SOCKET_TCP_CLIENT 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER 1
+#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1
+#define GRPC_POSIX_SOCKET_UTILS_COMMON 1
+#endif
+
#if defined(GRPC_POSIX_HOST_NAME_MAX) && defined(GRPC_POSIX_SYSCONF)
#error "Cannot define both GRPC_POSIX_HOST_NAME_MAX and GRPC_POSIX_SYSCONF"
#endif
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index fe0d834582..8638935060 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -37,6 +37,10 @@
#include <sys/socket.h>
#endif
+#ifdef GRPC_CFSTREAM
+#include <sys/socket.h>
+#endif
+
#include "src/core/lib/iomgr/pollset_set.h"
#define GRPC_MAX_SOCKADDR_SIZE 128
diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc
index a82075542f..7a825643e1 100644
--- a/src/core/lib/iomgr/resolve_address_posix.cc
+++ b/src/core/lib/iomgr/resolve_address_posix.cc
@@ -19,7 +19,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
#include "src/core/lib/iomgr/sockaddr.h"
diff --git a/src/core/lib/iomgr/sockaddr_posix.h b/src/core/lib/iomgr/sockaddr_posix.h
index 5b18bbc465..3cedd9082d 100644
--- a/src/core/lib/iomgr/sockaddr_posix.h
+++ b/src/core/lib/iomgr/sockaddr_posix.h
@@ -23,7 +23,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_SOCKADDR
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
diff --git a/src/core/lib/iomgr/socket_factory_posix.cc b/src/core/lib/iomgr/socket_factory_posix.cc
index 1d1e36c0e3..57137769c8 100644
--- a/src/core/lib/iomgr/socket_factory_posix.cc
+++ b/src/core/lib/iomgr/socket_factory_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_SOCKET_FACTORY
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/useful.h"
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc
index 04a1767731..caee652307 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.cc
+++ b/src/core/lib/iomgr/socket_utils_common_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
#include "src/core/lib/iomgr/socket_utils.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
diff --git a/src/core/lib/iomgr/tcp_cfstream.cc b/src/core/lib/iomgr/tcp_cfstream.cc
new file mode 100644
index 0000000000..f7ef03d228
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_cfstream.cc
@@ -0,0 +1,357 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_TCP
+
+#import <CoreFoundation/CoreFoundation.h>
+#import "src/core/lib/iomgr/tcp_cfstream.h"
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/error_apple.h"
+#include "src/core/lib/iomgr/tcp_cfstream_sync.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+typedef struct {
+ grpc_endpoint base;
+ gpr_refcount refcount;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+ CFStreamSync* stream_sync;
+
+ grpc_closure* read_cb;
+ grpc_closure* write_cb;
+ grpc_slice_buffer* read_slices;
+ grpc_slice_buffer* write_slices;
+
+ grpc_closure read_action;
+ grpc_closure write_action;
+ CFStreamEventType read_type;
+
+ char* peer_string;
+ grpc_resource_user* resource_user;
+ grpc_resource_user_slice_allocator slice_allocator;
+} CFStreamTCP;
+
+static void TCPFree(CFStreamTCP* tcp) {
+ grpc_resource_user_unref(tcp->resource_user);
+ CFRelease(tcp->read_stream);
+ CFRelease(tcp->write_stream);
+ CFSTREAM_SYNC_UNREF(tcp->stream_sync, "free");
+ gpr_free(tcp->peer_string);
+ gpr_free(tcp);
+}
+
+#ifndef NDEBUG
+#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file,
+ int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
+ val - 1);
+ }
+ if (gpr_unref(&tcp->refcount)) {
+ TCPFree(tcp);
+ }
+}
+static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file,
+ int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
+ val + 1);
+ }
+ gpr_ref(&tcp->refcount);
+}
+#else
+#define TCP_REF(tcp, reason) tcp_ref((tcp))
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+static void tcp_unref(CFStreamTCP* tcp) {
+ if (gpr_unref(&tcp->refcount)) {
+ TCPFree(tcp);
+ }
+}
+static void tcp_ref(CFStreamTCP* tcp) { gpr_ref(&tcp->refcount); }
+#endif
+
+static grpc_error* TCPAnnotateError(grpc_error* src_error, CFStreamTCP* tcp) {
+ return grpc_error_set_str(
+ grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE),
+ GRPC_ERROR_STR_TARGET_ADDRESS,
+ grpc_slice_from_copied_string(tcp->peer_string));
+}
+
+static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb,
+ tcp->read_cb->cb, tcp->read_cb->cb_arg);
+ size_t i;
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+
+ for (i = 0; i < tcp->read_slices->count; i++) {
+ char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
+ gpr_free(dump);
+ }
+ }
+ grpc_closure* cb = tcp->read_cb;
+ tcp->read_cb = nullptr;
+ tcp->read_slices = nullptr;
+ GRPC_CLOSURE_SCHED(cb, error);
+}
+
+static void CallWriteCB(CFStreamTCP* tcp, grpc_error* error) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb,
+ tcp->write_cb->cb, tcp->write_cb->cb_arg);
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "write: error=%s", str);
+ }
+ grpc_closure* cb = tcp->write_cb;
+ tcp->write_cb = nullptr;
+ tcp->write_slices = nullptr;
+ GRPC_CLOSURE_SCHED(cb, error);
+}
+
+static void ReadAction(void* arg, grpc_error* error) {
+ CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg);
+ GPR_ASSERT(tcp->read_cb != nullptr);
+ if (error) {
+ grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
+ CallReadCB(tcp, GRPC_ERROR_REF(error));
+ TCP_UNREF(tcp, "read");
+ return;
+ }
+
+ GPR_ASSERT(tcp->read_slices->count == 1);
+ grpc_slice slice = tcp->read_slices->slices[0];
+ size_t len = GRPC_SLICE_LENGTH(slice);
+ CFIndex read_size =
+ CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len);
+ if (read_size == -1) {
+ grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
+ CFErrorRef stream_error = CFReadStreamCopyError(tcp->read_stream);
+ CallReadCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(
+ stream_error, "Read error"),
+ tcp));
+ CFRelease(stream_error);
+ TCP_UNREF(tcp, "read");
+ } else if (read_size == 0) {
+ grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
+ CallReadCB(tcp,
+ TCPAnnotateError(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
+ TCP_UNREF(tcp, "read");
+ } else {
+ if (read_size < len) {
+ grpc_slice_buffer_trim_end(tcp->read_slices, len - read_size, nullptr);
+ }
+ CallReadCB(tcp, GRPC_ERROR_NONE);
+ TCP_UNREF(tcp, "read");
+ }
+}
+
+static void WriteAction(void* arg, grpc_error* error) {
+ CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg);
+ GPR_ASSERT(tcp->write_cb != nullptr);
+ if (error) {
+ grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices);
+ CallWriteCB(tcp, GRPC_ERROR_REF(error));
+ TCP_UNREF(tcp, "write");
+ return;
+ }
+
+ grpc_slice slice = grpc_slice_buffer_take_first(tcp->write_slices);
+ size_t slice_len = GRPC_SLICE_LENGTH(slice);
+ CFIndex write_size = CFWriteStreamWrite(
+ tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
+ if (write_size == -1) {
+ grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices);
+ CFErrorRef stream_error = CFWriteStreamCopyError(tcp->write_stream);
+ CallWriteCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(
+ stream_error, "write failed."),
+ tcp));
+ CFRelease(stream_error);
+ TCP_UNREF(tcp, "write");
+ } else {
+ if (write_size < GRPC_SLICE_LENGTH(slice)) {
+ grpc_slice_buffer_undo_take_first(
+ tcp->write_slices, grpc_slice_sub(slice, write_size, slice_len));
+ }
+ if (tcp->write_slices->length > 0) {
+ tcp->stream_sync->NotifyOnWrite(&tcp->write_action);
+ } else {
+ CallWriteCB(tcp, GRPC_ERROR_NONE);
+ TCP_UNREF(tcp, "write");
+ }
+
+ if (grpc_tcp_trace.enabled()) {
+ grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
+ char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, dump);
+ gpr_free(dump);
+ grpc_slice_unref_internal(trace_slice);
+ }
+ }
+ grpc_slice_unref_internal(slice);
+}
+
+static void TCPReadAllocationDone(void* arg, grpc_error* error) {
+ CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg);
+ if (error == GRPC_ERROR_NONE) {
+ tcp->stream_sync->NotifyOnRead(&tcp->read_action);
+ } else {
+ grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
+ CallReadCB(tcp, error);
+ TCP_UNREF(tcp, "read");
+ }
+}
+
+static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb,
+ slices->length);
+ }
+ GPR_ASSERT(tcp->read_cb == nullptr);
+ tcp->read_cb = cb;
+ tcp->read_slices = slices;
+ grpc_slice_buffer_reset_and_unref_internal(slices);
+ grpc_resource_user_alloc_slices(&tcp->slice_allocator,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
+ tcp->read_slices);
+ TCP_REF(tcp, "read");
+}
+
+static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb,
+ slices->length);
+ }
+ GPR_ASSERT(tcp->write_cb == nullptr);
+ tcp->write_cb = cb;
+ tcp->write_slices = slices;
+ TCP_REF(tcp, "write");
+ tcp->stream_sync->NotifyOnWrite(&tcp->write_action);
+}
+
+void TCPShutdown(grpc_endpoint* ep, grpc_error* why) {
+ CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "tcp:%p shutdown (%p)", tcp, why);
+ }
+ CFReadStreamClose(tcp->read_stream);
+ CFWriteStreamClose(tcp->write_stream);
+ tcp->stream_sync->Shutdown(why);
+ grpc_resource_user_shutdown(tcp->resource_user);
+}
+
+void TCPDestroy(grpc_endpoint* ep) {
+ CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "tcp:%p destroy", tcp);
+ }
+ TCP_UNREF(tcp, "destroy");
+}
+
+grpc_resource_user* TCPGetResourceUser(grpc_endpoint* ep) {
+ CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
+ return tcp->resource_user;
+}
+
+char* TCPGetPeer(grpc_endpoint* ep) {
+ CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
+ return gpr_strdup(tcp->peer_string);
+}
+
+int TCPGetFD(grpc_endpoint* ep) { return 0; }
+
+void TCPAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
+void TCPAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
+void TCPDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
+
+static const grpc_endpoint_vtable vtable = {TCPRead,
+ TCPWrite,
+ TCPAddToPollset,
+ TCPAddToPollsetSet,
+ TCPDeleteFromPollsetSet,
+ TCPShutdown,
+ TCPDestroy,
+ TCPGetResourceUser,
+ TCPGetPeer,
+ TCPGetFD};
+
+grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream,
+ const char* peer_string,
+ grpc_resource_quota* resource_quota,
+ CFStreamSync* stream_sync) {
+ CFStreamTCP* tcp = static_cast<CFStreamTCP*>(gpr_malloc(sizeof(CFStreamTCP)));
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp,
+ read_stream, write_stream);
+ }
+ tcp->base.vtable = &vtable;
+ gpr_ref_init(&tcp->refcount, 1);
+ tcp->read_stream = read_stream;
+ tcp->write_stream = write_stream;
+ CFRetain(read_stream);
+ CFRetain(write_stream);
+ tcp->stream_sync = stream_sync;
+ CFSTREAM_SYNC_REF(tcp->stream_sync, "endpoint create");
+
+ tcp->peer_string = gpr_strdup(peer_string);
+ tcp->read_cb = nil;
+ tcp->write_cb = nil;
+ tcp->read_slices = nil;
+ tcp->write_slices = nil;
+ GRPC_CLOSURE_INIT(&tcp->read_action, ReadAction, static_cast<void*>(tcp),
+ grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&tcp->write_action, WriteAction, static_cast<void*>(tcp),
+ grpc_schedule_on_exec_ctx);
+ tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
+ grpc_resource_user_slice_allocator_init(
+ &tcp->slice_allocator, tcp->resource_user, TCPReadAllocationDone, tcp);
+
+ return &tcp->base;
+}
+
+#endif /* GRPC_CFSTREAM_TCP */
diff --git a/src/core/lib/iomgr/tcp_cfstream.h b/src/core/lib/iomgr/tcp_cfstream.h
new file mode 100644
index 0000000000..9f52f6a5de
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_cfstream.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H
+#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H
+/*
+ Low level TCP "bottom half" implementation, for use by transports built on
+ top of a TCP connection.
+
+ Note that this file does not (yet) include APIs for creating the socket in
+ the first place.
+
+ All calls passing slice transfer ownership of a slice refcount unless
+ otherwise specified.
+*/
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GRPC_CFSTREAM
+
+#import <CoreFoundation/CoreFoundation.h>
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/tcp_cfstream_sync.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream,
+ const char* peer_string,
+ grpc_resource_quota* resource_quota,
+ CFStreamSync* stream_sync);
+
+#endif /* GRPC_CFSTREAM */
+
+#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H */
diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.cc b/src/core/lib/iomgr/tcp_cfstream_sync.cc
new file mode 100644
index 0000000000..5571db7491
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_cfstream_sync.cc
@@ -0,0 +1,183 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM
+#import <CoreFoundation/CoreFoundation.h>
+#import "src/core/lib/iomgr/tcp_cfstream_sync.h"
+
+#include <grpc/support/atm.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+void* CFStreamSync::Retain(void* info) {
+ CFStreamSync* sync = static_cast<CFStreamSync*>(info);
+ CFSTREAM_SYNC_REF(sync, "retain");
+ return info;
+}
+
+void CFStreamSync::Release(void* info) {
+ CFStreamSync* sync = static_cast<CFStreamSync*>(info);
+ CFSTREAM_SYNC_UNREF(sync, "release");
+}
+
+CFStreamSync* CFStreamSync::CreateStreamSync(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream) {
+ return new CFStreamSync(read_stream, write_stream);
+}
+
+void CFStreamSync::ReadCallback(CFReadStreamRef stream, CFStreamEventType type,
+ void* client_callback_info) {
+ CFStreamSync* sync = static_cast<CFStreamSync*>(client_callback_info);
+ CFSTREAM_SYNC_REF(sync, "read callback");
+ dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
+ ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP ReadCallback (%p, %lu, %p)",
+ stream, type, client_callback_info);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ sync->open_event_.SetReady();
+ break;
+ case kCFStreamEventHasBytesAvailable:
+ case kCFStreamEventEndEncountered:
+ sync->read_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ sync->open_event_.SetReady();
+ sync->read_event_.SetReady();
+ break;
+ default:
+ // Impossible
+ abort();
+ }
+ CFSTREAM_SYNC_UNREF(sync, "read callback");
+ });
+}
+void CFStreamSync::WriteCallback(CFWriteStreamRef stream,
+ CFStreamEventType type,
+ void* clientCallBackInfo) {
+ CFStreamSync* sync = static_cast<CFStreamSync*>(clientCallBackInfo);
+ CFSTREAM_SYNC_REF(sync, "write callback");
+ dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
+ ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP WriteCallback (%p, %lu, %p)",
+ stream, type, clientCallBackInfo);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ sync->open_event_.SetReady();
+ break;
+ case kCFStreamEventCanAcceptBytes:
+ case kCFStreamEventEndEncountered:
+ sync->write_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ sync->open_event_.SetReady();
+ sync->write_event_.SetReady();
+ break;
+ default:
+ // Impossible
+ abort();
+ }
+ CFSTREAM_SYNC_UNREF(sync, "write callback");
+ });
+}
+
+CFStreamSync::CFStreamSync(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream) {
+ gpr_ref_init(&refcount_, 1);
+ open_event_.InitEvent();
+ read_event_.InitEvent();
+ write_event_.InitEvent();
+ CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil};
+ CFReadStreamSetClient(
+ read_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamSync::ReadCallback, &ctx);
+ CFWriteStreamSetClient(
+ write_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamSync::WriteCallback, &ctx);
+ CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
+ CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
+}
+
+CFStreamSync::~CFStreamSync() {
+ open_event_.DestroyEvent();
+ read_event_.DestroyEvent();
+ write_event_.DestroyEvent();
+}
+
+void CFStreamSync::NotifyOnOpen(grpc_closure* closure) {
+ open_event_.NotifyOn(closure);
+}
+
+void CFStreamSync::NotifyOnRead(grpc_closure* closure) {
+ read_event_.NotifyOn(closure);
+}
+
+void CFStreamSync::NotifyOnWrite(grpc_closure* closure) {
+ write_event_.NotifyOn(closure);
+}
+
+void CFStreamSync::Shutdown(grpc_error* error) {
+ open_event_.SetShutdown(GRPC_ERROR_REF(error));
+ read_event_.SetShutdown(GRPC_ERROR_REF(error));
+ write_event_.SetShutdown(GRPC_ERROR_REF(error));
+ GRPC_ERROR_UNREF(error);
+}
+
+void CFStreamSync::Ref(const char* file, int line, const char* reason) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "TCP SYNC ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val,
+ val + 1);
+ }
+ gpr_ref(&refcount_);
+}
+
+void CFStreamSync::Unref(const char* file, int line, const char* reason) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
+ gpr_log(GPR_ERROR, "TCP SYNC unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
+ reason, val, val - 1);
+ }
+ if (gpr_unref(&refcount_)) {
+ delete this;
+ }
+}
+
+#endif
diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.h b/src/core/lib/iomgr/tcp_cfstream_sync.h
new file mode 100644
index 0000000000..9e1d8fbed9
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_cfstream_sync.h
@@ -0,0 +1,79 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H
+#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM
+#import <CoreFoundation/CoreFoundation.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/lockfree_event.h"
+
+class CFStreamSync final {
+ public:
+ static CFStreamSync* CreateStreamSync(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream);
+ ~CFStreamSync();
+ CFStreamSync(const CFReadStreamRef& ref) = delete;
+ CFStreamSync(CFReadStreamRef&& ref) = delete;
+ CFStreamSync& operator=(const CFStreamSync& rhs) = delete;
+
+ void NotifyOnOpen(grpc_closure* closure);
+ void NotifyOnRead(grpc_closure* closure);
+ void NotifyOnWrite(grpc_closure* closure);
+ void Shutdown(grpc_error* error);
+
+ void Ref(const char* file = nullptr, int line = 0,
+ const char* reason = nullptr);
+ void Unref(const char* file = nullptr, int line = 0,
+ const char* reason = nullptr);
+
+ private:
+ CFStreamSync(CFReadStreamRef read_stream, CFWriteStreamRef write_stream);
+ static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type,
+ void* client_callback_info);
+ static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type,
+ void* client_callback_info);
+ static void* Retain(void* info);
+ static void Release(void* info);
+
+ grpc_core::LockfreeEvent open_event_;
+ grpc_core::LockfreeEvent read_event_;
+ grpc_core::LockfreeEvent write_event_;
+
+ gpr_refcount refcount_;
+};
+
+#ifndef NDEBUG
+#define CFSTREAM_SYNC_REF(sync, reason) \
+ (sync)->Ref(__FILE__, __LINE__, (reason))
+#define CFSTREAM_SYNC_UNREF(sync, reason) \
+ (sync)->Unref(__FILE__, __LINE__, (reason))
+#else
+#define CFSTREAM_SYNC_REF(sync, reason) (sync)->Ref()
+#define CFSTREAM_SYNC_UNREF(sync, reason) (sync)->Unref()
+#endif
+
+#endif
+
+#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H */
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
new file mode 100644
index 0000000000..7deaece904
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -0,0 +1,212 @@
+
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_TCP_CLIENT
+
+#include <CoreFoundation/CoreFoundation.h>
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+
+#include <netinet/in.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/host_port.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/error_apple.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_cfstream.h"
+#include "src/core/lib/iomgr/tcp_cfstream_sync.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/timer.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+typedef struct CFStreamTCPConnect {
+ gpr_mu mu;
+ gpr_refcount refcount;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+ CFStreamSync* stream_sync;
+
+ grpc_timer alarm;
+ grpc_closure on_alarm;
+ grpc_closure on_open;
+
+ bool read_stream_open;
+ bool write_stream_open;
+ bool failed;
+
+ grpc_closure* closure;
+ grpc_endpoint** endpoint;
+ int refs;
+ char* addr_name;
+ grpc_resource_quota* resource_quota;
+} CFStreamTCPConnect;
+
+static void TCPConnectCleanup(CFStreamTCPConnect* connect) {
+ grpc_resource_quota_unref_internal(connect->resource_quota);
+ CFSTREAM_SYNC_UNREF(connect->stream_sync, "async connect clean up");
+ CFRelease(connect->read_stream);
+ CFRelease(connect->write_stream);
+ gpr_mu_destroy(&connect->mu);
+ gpr_free(connect->addr_name);
+ gpr_free(connect);
+}
+
+static void OnAlarm(void* arg, grpc_error* error) {
+ CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error);
+ }
+ gpr_mu_lock(&connect->mu);
+ grpc_closure* closure = connect->closure;
+ connect->closure = nil;
+ const bool done = (--connect->refs == 0);
+ gpr_mu_unlock(&connect->mu);
+ // Only schedule a callback once, by either on_timer or on_connected. The
+ // first one issues callback while the second one does cleanup.
+ if (done) {
+ TCPConnectCleanup(connect);
+ } else {
+ grpc_error* error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out");
+ GRPC_CLOSURE_SCHED(closure, error);
+ }
+}
+
+static void OnOpen(void* arg, grpc_error* error) {
+ CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error);
+ }
+ gpr_mu_lock(&connect->mu);
+ grpc_timer_cancel(&connect->alarm);
+ grpc_closure* closure = connect->closure;
+ connect->closure = nil;
+
+ bool done = (--connect->refs == 0);
+ grpc_endpoint** endpoint = connect->endpoint;
+
+ if (done) {
+ gpr_mu_unlock(&connect->mu);
+ TCPConnectCleanup(connect);
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream);
+ if (stream_error == NULL) {
+ stream_error = CFWriteStreamCopyError(connect->write_stream);
+ }
+ if (stream_error) {
+ error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error");
+ CFRelease(stream_error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream,
+ connect->addr_name, connect->resource_quota,
+ connect->stream_sync);
+ }
+ }
+ gpr_mu_unlock(&connect->mu);
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(error));
+ }
+}
+
+static void ParseResolvedAddress(const grpc_resolved_address* addr,
+ CFStringRef* host, int* port) {
+ char *host_port, *host_string, *port_string;
+ grpc_sockaddr_to_string(&host_port, addr, 1);
+ gpr_split_host_port(host_port, &host_string, &port_string);
+ *host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8);
+ gpr_free(host_string);
+ gpr_free(port_string);
+ gpr_free(host_port);
+ *port = grpc_sockaddr_get_port(addr);
+}
+
+static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* resolved_addr,
+ grpc_millis deadline) {
+ CFStreamTCPConnect* connect;
+
+ connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect));
+ connect->closure = closure;
+ connect->endpoint = ep;
+ connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
+ // connect->resource_quota = resource_quota;
+ connect->refs = 2; // One for the connect operation, one for the timer.
+ gpr_ref_init(&connect->refcount, 1);
+ gpr_mu_init(&connect->mu);
+
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
+ connect->addr_name);
+ }
+
+ grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_unref_internal(resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
+ (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+ connect->resource_quota = resource_quota;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+
+ CFStringRef host;
+ int port;
+ ParseResolvedAddress(resolved_addr, &host, &port);
+ CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream,
+ &write_stream);
+ CFRelease(host);
+ connect->read_stream = read_stream;
+ connect->write_stream = write_stream;
+ connect->stream_sync =
+ CFStreamSync::CreateStreamSync(read_stream, write_stream);
+ GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
+ grpc_schedule_on_exec_ctx);
+ connect->stream_sync->NotifyOnOpen(&connect->on_open);
+ GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect,
+ grpc_schedule_on_exec_ctx);
+ gpr_mu_lock(&connect->mu);
+ CFReadStreamOpen(read_stream);
+ CFWriteStreamOpen(write_stream);
+ grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
+ gpr_mu_unlock(&connect->mu);
+}
+
+grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {TCPClientConnect};
+
+#endif /* GRPC_CFSTREAM_TCP_CLIENT */
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 900c056575..39da7f1637 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP_CLIENT
#include "src/core/lib/iomgr/tcp_client_posix.h"
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index b79ffe20f1..a713d86633 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP
#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/tcp_posix.h"
@@ -811,4 +811,4 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
TCP_UNREF(tcp, "destroy");
}
-#endif
+#endif /* GRPC_POSIX_SOCKET_TCP */
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 484d2b6077..0a5caca906 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -25,7 +25,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP_SERVER
#include "src/core/lib/iomgr/tcp_server.h"
@@ -559,4 +559,4 @@ grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
tcp_server_shutdown_starting_add,
tcp_server_unref,
tcp_server_shutdown_listeners};
-#endif
+#endif /* GRPC_POSIX_SOCKET_TCP_SERVER */
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 2d95aa66d6..73afa15e65 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -20,7 +20,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_POSIX_SOCKET
+#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
#include "src/core/lib/iomgr/tcp_server_utils_posix.h"
@@ -217,4 +217,4 @@ error:
return ret;
}
-#endif /* GRPC_POSIX_SOCKET */
+#endif /* GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON */
diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc
index fd56997388..1f1c08b159 100644
--- a/src/core/lib/slice/slice_buffer.cc
+++ b/src/core/lib/slice/slice_buffer.cc
@@ -333,14 +333,26 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n,
size_t slice_len = GRPC_SLICE_LENGTH(slice);
if (slice_len > n) {
sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n);
- grpc_slice_buffer_add_indexed(garbage, slice);
+ if (garbage) {
+ grpc_slice_buffer_add_indexed(garbage, slice);
+ } else {
+ grpc_slice_unref_internal(slice);
+ }
return;
} else if (slice_len == n) {
- grpc_slice_buffer_add_indexed(garbage, slice);
+ if (garbage) {
+ grpc_slice_buffer_add_indexed(garbage, slice);
+ } else {
+ grpc_slice_unref_internal(slice);
+ }
sb->count = idx;
return;
} else {
- grpc_slice_buffer_add_indexed(garbage, slice);
+ if (garbage) {
+ grpc_slice_buffer_add_indexed(garbage, slice);
+ } else {
+ grpc_slice_unref_internal(slice);
+ }
n -= slice_len;
sb->count = idx;
}
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index 6b41e4b37e..9b633efc57 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -184,7 +184,11 @@ void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream,
nullptr) {
transport->vtable->set_pollset_set(transport, stream, pollset_set);
} else {
+#ifdef GRPC_CFSTREAM
+ // No-op for empty pollset
+#else
abort();
+#endif
}
}