diff options
Diffstat (limited to 'src/core')
27 files changed, 1062 insertions, 28 deletions
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index b604f2bf14..ebe2c4c41c 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -18,7 +18,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/port.h" -#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) +#if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) #include <ares.h> #include <sys/ioctl.h> @@ -348,4 +348,4 @@ void grpc_ares_ev_driver_start(grpc_ares_ev_driver* ev_driver) { gpr_mu_unlock(&ev_driver->mu); } -#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET) */ +#endif /* GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER) */ diff --git a/src/core/lib/iomgr/error_apple.cc b/src/core/lib/iomgr/error_apple.cc new file mode 100644 index 0000000000..95d69ecee9 --- /dev/null +++ b/src/core/lib/iomgr/error_apple.cc @@ -0,0 +1,51 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GRPC_CFSTREAM +#include <CoreFoundation/CoreFoundation.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/iomgr/error.h" + +#define MAX_ERROR_DESCRIPTION 256 + +grpc_error* grpc_error_create_from_cferror(const char* file, int line, + void* arg, const char* custom_desc) { + CFErrorRef error = static_cast<CFErrorRef>(arg); + char buf_domain[MAX_ERROR_DESCRIPTION], buf_desc[MAX_ERROR_DESCRIPTION]; + char* error_msg; + CFErrorDomain domain = CFErrorGetDomain((error)); + CFIndex code = CFErrorGetCode((error)); + CFStringRef desc = CFErrorCopyDescription((error)); + CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION, + kCFStringEncodingUTF8); + CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION, + kCFStringEncodingUTF8); + gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)", + custom_desc, buf_domain, code, buf_desc); + CFRelease(desc); + grpc_error* return_error = grpc_error_create( + file, line, grpc_slice_from_copied_string(error_msg), NULL, 0); + gpr_free(error_msg); + return return_error; +} +#endif /* GRPC_CFSTREAM */ diff --git a/src/core/lib/iomgr/error_apple.h b/src/core/lib/iomgr/error_apple.h new file mode 100644 index 0000000000..276ec77d35 --- /dev/null +++ b/src/core/lib/iomgr/error_apple.h @@ -0,0 +1,31 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_ERROR_APPLE_H +#define GRPC_CORE_LIB_IOMGR_ERROR_APPLE_H + +#ifdef GRPC_CFSTREAM +// Create an error from Apple Core Foundation CFError object +#define GRPC_ERROR_CREATE_FROM_CFERROR(error, desc) \ + grpc_error_create_from_cferror(__FILE__, __LINE__, \ + static_cast<void*>((error)), (desc)) +grpc_error* grpc_error_create_from_cferror(const char* file, int line, + void* arg, const char* desc); +#endif /* GRPC_CFSTREAM */ + +#endif /* GRPC_CORE_LIB_IOMGR_ERROR_APPLE_H */ diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index e5db1be0e0..cf839619cd 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1237,12 +1237,12 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { } #else /* defined(GRPC_LINUX_EPOLL) */ -#if defined(GRPC_POSIX_SOCKET) +#if defined(GRPC_POSIX_SOCKET_EV_EPOLL1) #include "src/core/lib/iomgr/ev_epoll1_linux.h" /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { return nullptr; } -#endif /* defined(GRPC_POSIX_SOCKET) */ +#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */ #endif /* !defined(GRPC_LINUX_EPOLL) */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 4c6cff7fe2..c86ed1e6c9 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1556,7 +1556,7 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( } #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ -#if defined(GRPC_POSIX_SOCKET) +#if defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) #include "src/core/lib/iomgr/ev_epollex_linux.h" /* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means epoll_create1 is not available. Return NULL */ @@ -1564,6 +1564,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { return nullptr; } -#endif /* defined(GRPC_POSIX_SOCKET) */ +#endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLLEX) */ #endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index 494bc71c1d..a144817a83 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1721,7 +1721,7 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( } #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */ -#if defined(GRPC_POSIX_SOCKET) +#if defined(GRPC_POSIX_SOCKET_EV_EPOLLSIG) #include "src/core/lib/iomgr/ev_epollsig_linux.h" /* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means epoll_create1 is not available. Return NULL */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 504787e659..70958ed562 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_EV_POLL #include "src/core/lib/iomgr/ev_poll_posix.h" @@ -1761,4 +1761,4 @@ const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request) { return &vtable; } -#endif +#endif /* GRPC_POSIX_SOCKET_EV_POLL */ diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 6bd1dc8e50..dd3556ea40 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_EV #include "src/core/lib/iomgr/ev_posix.h" @@ -334,4 +334,4 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) { g_event_engine->pollset_set_del_fd(pollset_set, fd); } -#endif // GRPC_POSIX_SOCKET +#endif // GRPC_POSIX_SOCKET_EV diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc index 66c9cb7ff7..ca7334c9a4 100644 --- a/src/core/lib/iomgr/iomgr_posix.cc +++ b/src/core/lib/iomgr/iomgr_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_IOMGR #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" @@ -64,4 +64,4 @@ void grpc_set_default_iomgr_platform() { grpc_set_iomgr_platform_vtable(&vtable); } -#endif /* GRPC_POSIX_SOCKET */ +#endif /* GRPC_POSIX_SOCKET_IOMGR */ diff --git a/src/core/lib/iomgr/polling_entity.cc b/src/core/lib/iomgr/polling_entity.cc index 9f164f65b0..9c21e1f488 100644 --- a/src/core/lib/iomgr/polling_entity.cc +++ b/src/core/lib/iomgr/polling_entity.cc @@ -61,8 +61,14 @@ bool grpc_polling_entity_is_empty(const grpc_polling_entity* pollent) { void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent, grpc_pollset_set* pss_dst) { if (pollent->tag == GRPC_POLLS_POLLSET) { +#ifdef GRPC_CFSTREAM + if (pollent->pollent.pollset != nullptr) { + grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset); + } +#else GPR_ASSERT(pollent->pollent.pollset != nullptr); grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset); +#endif } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) { GPR_ASSERT(pollent->pollent.pollset_set != nullptr); grpc_pollset_set_add_pollset_set(pss_dst, pollent->pollent.pollset_set); @@ -75,8 +81,14 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent, void grpc_polling_entity_del_from_pollset_set(grpc_polling_entity* pollent, grpc_pollset_set* pss_dst) { if (pollent->tag == GRPC_POLLS_POLLSET) { +#ifdef GRPC_CFSTREAM + if (pollent->pollent.pollset != nullptr) { + grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset); + } +#else GPR_ASSERT(pollent->pollent.pollset != nullptr); grpc_pollset_set_del_pollset(pss_dst, pollent->pollent.pollset); +#endif } else if (pollent->tag == GRPC_POLLS_POLLSET_SET) { GPR_ASSERT(pollent->pollent.pollset_set != nullptr); grpc_pollset_set_del_pollset_set(pss_dst, pollent->pollent.pollset_set); diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index a397012003..a1a029e1bf 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -97,7 +97,26 @@ #define GRPC_MSG_IOVLEN_TYPE int #define GRPC_POSIX_FORK 1 #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#ifdef GRPC_CFSTREAM +#define GRPC_POSIX_SOCKET_IOMGR 1 +#define GRPC_CFSTREAM_TCP 1 +#define GRPC_CFSTREAM_TCP_CLIENT 1 +#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1 +#define GRPC_POSIX_SOCKET_EV 1 +#define GRPC_POSIX_SOCKET_EV_EPOLL1 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1 +#define GRPC_POSIX_SOCKET_EV_POLL 1 +#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1 +#define GRPC_POSIX_SOCKET_SOCKADDR 1 +#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1 +#define GRPC_POSIX_SOCKET_TCP 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1 +#define GRPC_POSIX_SOCKET_UTILS_COMMON 1 +#else #define GRPC_POSIX_SOCKET 1 +#endif #define GRPC_POSIX_SOCKETUTILS 1 #define GRPC_POSIX_SYSCONF 1 #define GRPC_POSIX_WAKEUP_FD 1 @@ -131,12 +150,30 @@ #endif #if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \ - defined(GRPC_CUSTOM_SOCKET) != \ + defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_CFSTREAM) != \ 1 #error \ "Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GRPC_CUSTOM_SOCKET" #endif +#ifdef GRPC_POSIX_SOCKET +#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1 +#define GRPC_POSIX_SOCKET_EV 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLEX 1 +#define GRPC_POSIX_SOCKET_EV_EPOLLSIG 1 +#define GRPC_POSIX_SOCKET_EV_POLL 1 +#define GRPC_POSIX_SOCKET_EV_EPOLL1 1 +#define GRPC_POSIX_SOCKET_IOMGR 1 +#define GRPC_POSIX_SOCKET_RESOLVE_ADDRESS 1 +#define GRPC_POSIX_SOCKET_SOCKADDR 1 +#define GRPC_POSIX_SOCKET_SOCKET_FACTORY 1 +#define GRPC_POSIX_SOCKET_TCP 1 +#define GRPC_POSIX_SOCKET_TCP_CLIENT 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER 1 +#define GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON 1 +#define GRPC_POSIX_SOCKET_UTILS_COMMON 1 +#endif + #if defined(GRPC_POSIX_HOST_NAME_MAX) && defined(GRPC_POSIX_SYSCONF) #error "Cannot define both GRPC_POSIX_HOST_NAME_MAX and GRPC_POSIX_SYSCONF" #endif diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index fe0d834582..8638935060 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -37,6 +37,10 @@ #include <sys/socket.h> #endif +#ifdef GRPC_CFSTREAM +#include <sys/socket.h> +#endif + #include "src/core/lib/iomgr/pollset_set.h" #define GRPC_MAX_SOCKADDR_SIZE 128 diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc index a82075542f..7a825643e1 100644 --- a/src/core/lib/iomgr/resolve_address_posix.cc +++ b/src/core/lib/iomgr/resolve_address_posix.cc @@ -19,7 +19,7 @@ #include <grpc/support/port_platform.h> #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_RESOLVE_ADDRESS #include "src/core/lib/iomgr/sockaddr.h" diff --git a/src/core/lib/iomgr/sockaddr_posix.h b/src/core/lib/iomgr/sockaddr_posix.h index 5b18bbc465..3cedd9082d 100644 --- a/src/core/lib/iomgr/sockaddr_posix.h +++ b/src/core/lib/iomgr/sockaddr_posix.h @@ -23,7 +23,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_SOCKADDR #include <arpa/inet.h> #include <netdb.h> #include <netinet/in.h> diff --git a/src/core/lib/iomgr/socket_factory_posix.cc b/src/core/lib/iomgr/socket_factory_posix.cc index 1d1e36c0e3..57137769c8 100644 --- a/src/core/lib/iomgr/socket_factory_posix.cc +++ b/src/core/lib/iomgr/socket_factory_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_SOCKET_FACTORY #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/useful.h" diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc index 04a1767731..caee652307 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.cc +++ b/src/core/lib/iomgr/socket_utils_common_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_UTILS_COMMON #include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" diff --git a/src/core/lib/iomgr/tcp_cfstream.cc b/src/core/lib/iomgr/tcp_cfstream.cc new file mode 100644 index 0000000000..f7ef03d228 --- /dev/null +++ b/src/core/lib/iomgr/tcp_cfstream.cc @@ -0,0 +1,357 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_TCP + +#import <CoreFoundation/CoreFoundation.h> +#import "src/core/lib/iomgr/tcp_cfstream.h" + +#include <grpc/slice_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/gpr/string.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/error_apple.h" +#include "src/core/lib/iomgr/tcp_cfstream_sync.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct { + grpc_endpoint base; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamSync* stream_sync; + + grpc_closure* read_cb; + grpc_closure* write_cb; + grpc_slice_buffer* read_slices; + grpc_slice_buffer* write_slices; + + grpc_closure read_action; + grpc_closure write_action; + CFStreamEventType read_type; + + char* peer_string; + grpc_resource_user* resource_user; + grpc_resource_user_slice_allocator slice_allocator; +} CFStreamTCP; + +static void TCPFree(CFStreamTCP* tcp) { + grpc_resource_user_unref(tcp->resource_user); + CFRelease(tcp->read_stream); + CFRelease(tcp->write_stream); + CFSTREAM_SYNC_UNREF(tcp->stream_sync, "free"); + gpr_free(tcp->peer_string); + gpr_free(tcp); +} + +#ifndef NDEBUG +#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) +#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file, + int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, + val - 1); + } + if (gpr_unref(&tcp->refcount)) { + TCPFree(tcp); + } +} +static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file, + int line) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, + val + 1); + } + gpr_ref(&tcp->refcount); +} +#else +#define TCP_REF(tcp, reason) tcp_ref((tcp)) +#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +static void tcp_unref(CFStreamTCP* tcp) { + if (gpr_unref(&tcp->refcount)) { + TCPFree(tcp); + } +} +static void tcp_ref(CFStreamTCP* tcp) { gpr_ref(&tcp->refcount); } +#endif + +static grpc_error* TCPAnnotateError(grpc_error* src_error, CFStreamTCP* tcp) { + return grpc_error_set_str( + grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE), + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(tcp->peer_string)); +} + +static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb, + tcp->read_cb->cb, tcp->read_cb->cb_arg); + size_t i; + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + + for (i = 0; i < tcp->read_slices->count; i++) { + char* dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); + gpr_free(dump); + } + } + grpc_closure* cb = tcp->read_cb; + tcp->read_cb = nullptr; + tcp->read_slices = nullptr; + GRPC_CLOSURE_SCHED(cb, error); +} + +static void CallWriteCB(CFStreamTCP* tcp, grpc_error* error) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb, + tcp->write_cb->cb, tcp->write_cb->cb_arg); + const char* str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "write: error=%s", str); + } + grpc_closure* cb = tcp->write_cb; + tcp->write_cb = nullptr; + tcp->write_slices = nullptr; + GRPC_CLOSURE_SCHED(cb, error); +} + +static void ReadAction(void* arg, grpc_error* error) { + CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg); + GPR_ASSERT(tcp->read_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); + CallReadCB(tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(tcp, "read"); + return; + } + + GPR_ASSERT(tcp->read_slices->count == 1); + grpc_slice slice = tcp->read_slices->slices[0]; + size_t len = GRPC_SLICE_LENGTH(slice); + CFIndex read_size = + CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len); + if (read_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); + CFErrorRef stream_error = CFReadStreamCopyError(tcp->read_stream); + CallReadCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR( + stream_error, "Read error"), + tcp)); + CFRelease(stream_error); + TCP_UNREF(tcp, "read"); + } else if (read_size == 0) { + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); + CallReadCB(tcp, + TCPAnnotateError( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); + TCP_UNREF(tcp, "read"); + } else { + if (read_size < len) { + grpc_slice_buffer_trim_end(tcp->read_slices, len - read_size, nullptr); + } + CallReadCB(tcp, GRPC_ERROR_NONE); + TCP_UNREF(tcp, "read"); + } +} + +static void WriteAction(void* arg, grpc_error* error) { + CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg); + GPR_ASSERT(tcp->write_cb != nullptr); + if (error) { + grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices); + CallWriteCB(tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(tcp, "write"); + return; + } + + grpc_slice slice = grpc_slice_buffer_take_first(tcp->write_slices); + size_t slice_len = GRPC_SLICE_LENGTH(slice); + CFIndex write_size = CFWriteStreamWrite( + tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); + if (write_size == -1) { + grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices); + CFErrorRef stream_error = CFWriteStreamCopyError(tcp->write_stream); + CallWriteCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR( + stream_error, "write failed."), + tcp)); + CFRelease(stream_error); + TCP_UNREF(tcp, "write"); + } else { + if (write_size < GRPC_SLICE_LENGTH(slice)) { + grpc_slice_buffer_undo_take_first( + tcp->write_slices, grpc_slice_sub(slice, write_size, slice_len)); + } + if (tcp->write_slices->length > 0) { + tcp->stream_sync->NotifyOnWrite(&tcp->write_action); + } else { + CallWriteCB(tcp, GRPC_ERROR_NONE); + TCP_UNREF(tcp, "write"); + } + + if (grpc_tcp_trace.enabled()) { + grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size); + char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, dump); + gpr_free(dump); + grpc_slice_unref_internal(trace_slice); + } + } + grpc_slice_unref_internal(slice); +} + +static void TCPReadAllocationDone(void* arg, grpc_error* error) { + CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg); + if (error == GRPC_ERROR_NONE) { + tcp->stream_sync->NotifyOnRead(&tcp->read_action); + } else { + grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); + CallReadCB(tcp, error); + TCP_UNREF(tcp, "read"); + } +} + +static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb, + slices->length); + } + GPR_ASSERT(tcp->read_cb == nullptr); + tcp->read_cb = cb; + tcp->read_slices = slices; + grpc_slice_buffer_reset_and_unref_internal(slices); + grpc_resource_user_alloc_slices(&tcp->slice_allocator, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, + tcp->read_slices); + TCP_REF(tcp, "read"); +} + +static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { + CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb, + slices->length); + } + GPR_ASSERT(tcp->write_cb == nullptr); + tcp->write_cb = cb; + tcp->write_slices = slices; + TCP_REF(tcp, "write"); + tcp->stream_sync->NotifyOnWrite(&tcp->write_action); +} + +void TCPShutdown(grpc_endpoint* ep, grpc_error* why) { + CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p shutdown (%p)", tcp, why); + } + CFReadStreamClose(tcp->read_stream); + CFWriteStreamClose(tcp->write_stream); + tcp->stream_sync->Shutdown(why); + grpc_resource_user_shutdown(tcp->resource_user); +} + +void TCPDestroy(grpc_endpoint* ep) { + CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p destroy", tcp); + } + TCP_UNREF(tcp, "destroy"); +} + +grpc_resource_user* TCPGetResourceUser(grpc_endpoint* ep) { + CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); + return tcp->resource_user; +} + +char* TCPGetPeer(grpc_endpoint* ep) { + CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep); + return gpr_strdup(tcp->peer_string); +} + +int TCPGetFD(grpc_endpoint* ep) { return 0; } + +void TCPAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} +void TCPAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} +void TCPDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} + +static const grpc_endpoint_vtable vtable = {TCPRead, + TCPWrite, + TCPAddToPollset, + TCPAddToPollsetSet, + TCPDeleteFromPollsetSet, + TCPShutdown, + TCPDestroy, + TCPGetResourceUser, + TCPGetPeer, + TCPGetFD}; + +grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream, + const char* peer_string, + grpc_resource_quota* resource_quota, + CFStreamSync* stream_sync) { + CFStreamTCP* tcp = static_cast<CFStreamTCP*>(gpr_malloc(sizeof(CFStreamTCP))); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp, + read_stream, write_stream); + } + tcp->base.vtable = &vtable; + gpr_ref_init(&tcp->refcount, 1); + tcp->read_stream = read_stream; + tcp->write_stream = write_stream; + CFRetain(read_stream); + CFRetain(write_stream); + tcp->stream_sync = stream_sync; + CFSTREAM_SYNC_REF(tcp->stream_sync, "endpoint create"); + + tcp->peer_string = gpr_strdup(peer_string); + tcp->read_cb = nil; + tcp->write_cb = nil; + tcp->read_slices = nil; + tcp->write_slices = nil; + GRPC_CLOSURE_INIT(&tcp->read_action, ReadAction, static_cast<void*>(tcp), + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&tcp->write_action, WriteAction, static_cast<void*>(tcp), + grpc_schedule_on_exec_ctx); + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); + grpc_resource_user_slice_allocator_init( + &tcp->slice_allocator, tcp->resource_user, TCPReadAllocationDone, tcp); + + return &tcp->base; +} + +#endif /* GRPC_CFSTREAM_TCP */ diff --git a/src/core/lib/iomgr/tcp_cfstream.h b/src/core/lib/iomgr/tcp_cfstream.h new file mode 100644 index 0000000000..9f52f6a5de --- /dev/null +++ b/src/core/lib/iomgr/tcp_cfstream.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H +#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H +/* + Low level TCP "bottom half" implementation, for use by transports built on + top of a TCP connection. + + Note that this file does not (yet) include APIs for creating the socket in + the first place. + + All calls passing slice transfer ownership of a slice refcount unless + otherwise specified. +*/ + +#include <grpc/support/port_platform.h> + +#ifdef GRPC_CFSTREAM + +#import <CoreFoundation/CoreFoundation.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/tcp_cfstream_sync.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream, + const char* peer_string, + grpc_resource_quota* resource_quota, + CFStreamSync* stream_sync); + +#endif /* GRPC_CFSTREAM */ + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H */ diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.cc b/src/core/lib/iomgr/tcp_cfstream_sync.cc new file mode 100644 index 0000000000..5571db7491 --- /dev/null +++ b/src/core/lib/iomgr/tcp_cfstream_sync.cc @@ -0,0 +1,183 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM +#import <CoreFoundation/CoreFoundation.h> +#import "src/core/lib/iomgr/tcp_cfstream_sync.h" + +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +void* CFStreamSync::Retain(void* info) { + CFStreamSync* sync = static_cast<CFStreamSync*>(info); + CFSTREAM_SYNC_REF(sync, "retain"); + return info; +} + +void CFStreamSync::Release(void* info) { + CFStreamSync* sync = static_cast<CFStreamSync*>(info); + CFSTREAM_SYNC_UNREF(sync, "release"); +} + +CFStreamSync* CFStreamSync::CreateStreamSync(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream) { + return new CFStreamSync(read_stream, write_stream); +} + +void CFStreamSync::ReadCallback(CFReadStreamRef stream, CFStreamEventType type, + void* client_callback_info) { + CFStreamSync* sync = static_cast<CFStreamSync*>(client_callback_info); + CFSTREAM_SYNC_REF(sync, "read callback"); + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), + ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP ReadCallback (%p, %lu, %p)", + stream, type, client_callback_info); + } + switch (type) { + case kCFStreamEventOpenCompleted: + sync->open_event_.SetReady(); + break; + case kCFStreamEventHasBytesAvailable: + case kCFStreamEventEndEncountered: + sync->read_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + sync->open_event_.SetReady(); + sync->read_event_.SetReady(); + break; + default: + // Impossible + abort(); + } + CFSTREAM_SYNC_UNREF(sync, "read callback"); + }); +} +void CFStreamSync::WriteCallback(CFWriteStreamRef stream, + CFStreamEventType type, + void* clientCallBackInfo) { + CFStreamSync* sync = static_cast<CFStreamSync*>(clientCallBackInfo); + CFSTREAM_SYNC_REF(sync, "write callback"); + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), + ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP WriteCallback (%p, %lu, %p)", + stream, type, clientCallBackInfo); + } + switch (type) { + case kCFStreamEventOpenCompleted: + sync->open_event_.SetReady(); + break; + case kCFStreamEventCanAcceptBytes: + case kCFStreamEventEndEncountered: + sync->write_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + sync->open_event_.SetReady(); + sync->write_event_.SetReady(); + break; + default: + // Impossible + abort(); + } + CFSTREAM_SYNC_UNREF(sync, "write callback"); + }); +} + +CFStreamSync::CFStreamSync(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream) { + gpr_ref_init(&refcount_, 1); + open_event_.InitEvent(); + read_event_.InitEvent(); + write_event_.InitEvent(); + CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil}; + CFReadStreamSetClient( + read_stream, + kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamSync::ReadCallback, &ctx); + CFWriteStreamSetClient( + write_stream, + kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamSync::WriteCallback, &ctx); + CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); + CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); +} + +CFStreamSync::~CFStreamSync() { + open_event_.DestroyEvent(); + read_event_.DestroyEvent(); + write_event_.DestroyEvent(); +} + +void CFStreamSync::NotifyOnOpen(grpc_closure* closure) { + open_event_.NotifyOn(closure); +} + +void CFStreamSync::NotifyOnRead(grpc_closure* closure) { + read_event_.NotifyOn(closure); +} + +void CFStreamSync::NotifyOnWrite(grpc_closure* closure) { + write_event_.NotifyOn(closure); +} + +void CFStreamSync::Shutdown(grpc_error* error) { + open_event_.SetShutdown(GRPC_ERROR_REF(error)); + read_event_.SetShutdown(GRPC_ERROR_REF(error)); + write_event_.SetShutdown(GRPC_ERROR_REF(error)); + GRPC_ERROR_UNREF(error); +} + +void CFStreamSync::Ref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP SYNC ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val, + val + 1); + } + gpr_ref(&refcount_); +} + +void CFStreamSync::Unref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(GPR_ERROR, "TCP SYNC unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, + reason, val, val - 1); + } + if (gpr_unref(&refcount_)) { + delete this; + } +} + +#endif diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.h b/src/core/lib/iomgr/tcp_cfstream_sync.h new file mode 100644 index 0000000000..9e1d8fbed9 --- /dev/null +++ b/src/core/lib/iomgr/tcp_cfstream_sync.h @@ -0,0 +1,79 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H +#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM +#import <CoreFoundation/CoreFoundation.h> + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/lockfree_event.h" + +class CFStreamSync final { + public: + static CFStreamSync* CreateStreamSync(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream); + ~CFStreamSync(); + CFStreamSync(const CFReadStreamRef& ref) = delete; + CFStreamSync(CFReadStreamRef&& ref) = delete; + CFStreamSync& operator=(const CFStreamSync& rhs) = delete; + + void NotifyOnOpen(grpc_closure* closure); + void NotifyOnRead(grpc_closure* closure); + void NotifyOnWrite(grpc_closure* closure); + void Shutdown(grpc_error* error); + + void Ref(const char* file = nullptr, int line = 0, + const char* reason = nullptr); + void Unref(const char* file = nullptr, int line = 0, + const char* reason = nullptr); + + private: + CFStreamSync(CFReadStreamRef read_stream, CFWriteStreamRef write_stream); + static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type, + void* client_callback_info); + static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, + void* client_callback_info); + static void* Retain(void* info); + static void Release(void* info); + + grpc_core::LockfreeEvent open_event_; + grpc_core::LockfreeEvent read_event_; + grpc_core::LockfreeEvent write_event_; + + gpr_refcount refcount_; +}; + +#ifndef NDEBUG +#define CFSTREAM_SYNC_REF(sync, reason) \ + (sync)->Ref(__FILE__, __LINE__, (reason)) +#define CFSTREAM_SYNC_UNREF(sync, reason) \ + (sync)->Unref(__FILE__, __LINE__, (reason)) +#else +#define CFSTREAM_SYNC_REF(sync, reason) (sync)->Ref() +#define CFSTREAM_SYNC_UNREF(sync, reason) (sync)->Unref() +#endif + +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H */ diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc new file mode 100644 index 0000000000..7deaece904 --- /dev/null +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -0,0 +1,212 @@ + +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_TCP_CLIENT + +#include <CoreFoundation/CoreFoundation.h> + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> + +#include <netinet/in.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/host_port.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/error_apple.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_cfstream.h" +#include "src/core/lib/iomgr/tcp_cfstream_sync.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/timer.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct CFStreamTCPConnect { + gpr_mu mu; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamSync* stream_sync; + + grpc_timer alarm; + grpc_closure on_alarm; + grpc_closure on_open; + + bool read_stream_open; + bool write_stream_open; + bool failed; + + grpc_closure* closure; + grpc_endpoint** endpoint; + int refs; + char* addr_name; + grpc_resource_quota* resource_quota; +} CFStreamTCPConnect; + +static void TCPConnectCleanup(CFStreamTCPConnect* connect) { + grpc_resource_quota_unref_internal(connect->resource_quota); + CFSTREAM_SYNC_UNREF(connect->stream_sync, "async connect clean up"); + CFRelease(connect->read_stream); + CFRelease(connect->write_stream); + gpr_mu_destroy(&connect->mu); + gpr_free(connect->addr_name); + gpr_free(connect); +} + +static void OnAlarm(void* arg, grpc_error* error) { + CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_closure* closure = connect->closure; + connect->closure = nil; + const bool done = (--connect->refs == 0); + gpr_mu_unlock(&connect->mu); + // Only schedule a callback once, by either on_timer or on_connected. The + // first one issues callback while the second one does cleanup. + if (done) { + TCPConnectCleanup(connect); + } else { + grpc_error* error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); + GRPC_CLOSURE_SCHED(closure, error); + } +} + +static void OnOpen(void* arg, grpc_error* error) { + CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_timer_cancel(&connect->alarm); + grpc_closure* closure = connect->closure; + connect->closure = nil; + + bool done = (--connect->refs == 0); + grpc_endpoint** endpoint = connect->endpoint; + + if (done) { + gpr_mu_unlock(&connect->mu); + TCPConnectCleanup(connect); + } else { + if (error == GRPC_ERROR_NONE) { + CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream); + if (stream_error == NULL) { + stream_error = CFWriteStreamCopyError(connect->write_stream); + } + if (stream_error) { + error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error"); + CFRelease(stream_error); + } + if (error == GRPC_ERROR_NONE) { + *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream, + connect->addr_name, connect->resource_quota, + connect->stream_sync); + } + } + gpr_mu_unlock(&connect->mu); + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(error)); + } +} + +static void ParseResolvedAddress(const grpc_resolved_address* addr, + CFStringRef* host, int* port) { + char *host_port, *host_string, *port_string; + grpc_sockaddr_to_string(&host_port, addr, 1); + gpr_split_host_port(host_port, &host_string, &port_string); + *host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8); + gpr_free(host_string); + gpr_free(port_string); + gpr_free(host_port); + *port = grpc_sockaddr_get_port(addr); +} + +static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* resolved_addr, + grpc_millis deadline) { + CFStreamTCPConnect* connect; + + connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect)); + connect->closure = closure; + connect->endpoint = ep; + connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); + // connect->resource_quota = resource_quota; + connect->refs = 2; // One for the connect operation, one for the timer. + gpr_ref_init(&connect->refcount, 1); + gpr_mu_init(&connect->mu); + + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", + connect->addr_name); + } + + grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + (grpc_resource_quota*)channel_args->args[i].value.pointer.p); + } + } + } + connect->resource_quota = resource_quota; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + + CFStringRef host; + int port; + ParseResolvedAddress(resolved_addr, &host, &port); + CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, + &write_stream); + CFRelease(host); + connect->read_stream = read_stream; + connect->write_stream = write_stream; + connect->stream_sync = + CFStreamSync::CreateStreamSync(read_stream, write_stream); + GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect), + grpc_schedule_on_exec_ctx); + connect->stream_sync->NotifyOnOpen(&connect->on_open); + GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, + grpc_schedule_on_exec_ctx); + gpr_mu_lock(&connect->mu); + CFReadStreamOpen(read_stream); + CFWriteStreamOpen(write_stream); + grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); + gpr_mu_unlock(&connect->mu); +} + +grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {TCPClientConnect}; + +#endif /* GRPC_CFSTREAM_TCP_CLIENT */ diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 900c056575..39da7f1637 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP_CLIENT #include "src/core/lib/iomgr/tcp_client_posix.h" diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index b79ffe20f1..a713d86633 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/tcp_posix.h" @@ -811,4 +811,4 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, TCP_UNREF(tcp, "destroy"); } -#endif +#endif /* GRPC_POSIX_SOCKET_TCP */ diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 484d2b6077..0a5caca906 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -25,7 +25,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP_SERVER #include "src/core/lib/iomgr/tcp_server.h" @@ -559,4 +559,4 @@ grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = { tcp_server_shutdown_starting_add, tcp_server_unref, tcp_server_shutdown_listeners}; -#endif +#endif /* GRPC_POSIX_SOCKET_TCP_SERVER */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 2d95aa66d6..73afa15e65 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -20,7 +20,7 @@ #include "src/core/lib/iomgr/port.h" -#ifdef GRPC_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON #include "src/core/lib/iomgr/tcp_server_utils_posix.h" @@ -217,4 +217,4 @@ error: return ret; } -#endif /* GRPC_POSIX_SOCKET */ +#endif /* GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON */ diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index fd56997388..1f1c08b159 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -333,14 +333,26 @@ void grpc_slice_buffer_trim_end(grpc_slice_buffer* sb, size_t n, size_t slice_len = GRPC_SLICE_LENGTH(slice); if (slice_len > n) { sb->slices[idx] = grpc_slice_split_head(&slice, slice_len - n); - grpc_slice_buffer_add_indexed(garbage, slice); + if (garbage) { + grpc_slice_buffer_add_indexed(garbage, slice); + } else { + grpc_slice_unref_internal(slice); + } return; } else if (slice_len == n) { - grpc_slice_buffer_add_indexed(garbage, slice); + if (garbage) { + grpc_slice_buffer_add_indexed(garbage, slice); + } else { + grpc_slice_unref_internal(slice); + } sb->count = idx; return; } else { - grpc_slice_buffer_add_indexed(garbage, slice); + if (garbage) { + grpc_slice_buffer_add_indexed(garbage, slice); + } else { + grpc_slice_unref_internal(slice); + } n -= slice_len; sb->count = idx; } diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc index 6b41e4b37e..9b633efc57 100644 --- a/src/core/lib/transport/transport.cc +++ b/src/core/lib/transport/transport.cc @@ -184,7 +184,11 @@ void grpc_transport_set_pops(grpc_transport* transport, grpc_stream* stream, nullptr) { transport->vtable->set_pollset_set(transport, stream, pollset_set); } else { +#ifdef GRPC_CFSTREAM + // No-op for empty pollset +#else abort(); +#endif } } |