aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2018-05-30 15:42:31 -0700
committerGravatar Muxi Yan <mxyan@google.com>2018-05-30 16:00:06 -0700
commit142cbb59480794980e53062c846bcc2d7512548a (patch)
treeb393255c68f2f1141e703d2cb1bc70312bd1eb97 /src/core/lib/iomgr
parenta6e8d0ba24a684713b879eb18e7ce5313f8030cf (diff)
Address comments on names and comments
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/cfstream_handle.cc185
-rw-r--r--src/core/lib/iomgr/cfstream_handle.h (renamed from src/core/lib/iomgr/tcp_cfstream_sync.h)25
-rw-r--r--src/core/lib/iomgr/endpoint_cfstream.cc372
-rw-r--r--src/core/lib/iomgr/endpoint_cfstream.h (renamed from src/core/lib/iomgr/tcp_cfstream.h)19
-rw-r--r--src/core/lib/iomgr/error_apple.cc3
-rw-r--r--src/core/lib/iomgr/polling_entity.cc2
-rw-r--r--src/core/lib/iomgr/port.h4
-rw-r--r--src/core/lib/iomgr/tcp_cfstream.cc368
-rw-r--r--src/core/lib/iomgr/tcp_cfstream_sync.cc183
-rw-r--r--src/core/lib/iomgr/tcp_client_cfstream.cc48
10 files changed, 609 insertions, 600 deletions
diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc
new file mode 100644
index 0000000000..543179d93e
--- /dev/null
+++ b/src/core/lib/iomgr/cfstream_handle.cc
@@ -0,0 +1,185 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM
+#import <CoreFoundation/CoreFoundation.h>
+#import "src/core/lib/iomgr/cfstream_handle.h"
+
+#include <grpc/support/atm.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+void* CFStreamHandle::Retain(void* info) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
+ CFSTREAM_SYNC_REF(handle, "retain");
+ return info;
+}
+
+void CFStreamHandle::Release(void* info) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
+ CFSTREAM_SYNC_UNREF(handle, "release");
+}
+
+CFStreamHandle* CFStreamHandle::CreateStreamSync(
+ CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
+ return new CFStreamHandle(read_stream, write_stream);
+}
+
+void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
+ CFStreamEventType type,
+ void* client_callback_info) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
+ CFSTREAM_SYNC_REF(handle, "read callback");
+ dispatch_async(
+ dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
+ stream, type, client_callback_info);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ handle->open_event_.SetReady();
+ break;
+ case kCFStreamEventHasBytesAvailable:
+ case kCFStreamEventEndEncountered:
+ handle->read_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ handle->open_event_.SetReady();
+ handle->read_event_.SetReady();
+ break;
+ default:
+ // Impossible
+ abort();
+ }
+ CFSTREAM_SYNC_UNREF(handle, "read callback");
+ });
+}
+void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
+ CFStreamEventType type,
+ void* clientCallBackInfo) {
+ CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
+ CFSTREAM_SYNC_REF(handle, "write callback");
+ dispatch_async(
+ dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
+ stream, type, clientCallBackInfo);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ handle->open_event_.SetReady();
+ break;
+ case kCFStreamEventCanAcceptBytes:
+ case kCFStreamEventEndEncountered:
+ handle->write_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ handle->open_event_.SetReady();
+ handle->write_event_.SetReady();
+ break;
+ default:
+ // Impossible
+ abort();
+ }
+ CFSTREAM_SYNC_UNREF(handle, "write callback");
+ });
+}
+
+CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream) {
+ gpr_ref_init(&refcount_, 1);
+ open_event_.InitEvent();
+ read_event_.InitEvent();
+ write_event_.InitEvent();
+ CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil};
+ CFReadStreamSetClient(
+ read_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamHandle::ReadCallback, &ctx);
+ CFWriteStreamSetClient(
+ write_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamHandle::WriteCallback, &ctx);
+ CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
+ CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
+}
+
+CFStreamHandle::~CFStreamHandle() {
+ open_event_.DestroyEvent();
+ read_event_.DestroyEvent();
+ write_event_.DestroyEvent();
+}
+
+void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
+ open_event_.NotifyOn(closure);
+}
+
+void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
+ read_event_.NotifyOn(closure);
+}
+
+void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
+ write_event_.NotifyOn(closure);
+}
+
+void CFStreamHandle::Shutdown(grpc_error* error) {
+ open_event_.SetShutdown(GRPC_ERROR_REF(error));
+ read_event_.SetShutdown(GRPC_ERROR_REF(error));
+ write_event_.SetShutdown(GRPC_ERROR_REF(error));
+ GRPC_ERROR_UNREF(error);
+}
+
+void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
+ reason, val, val + 1);
+ }
+ gpr_ref(&refcount_);
+}
+
+void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
+ gpr_log(GPR_ERROR,
+ "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
+ reason, val, val - 1);
+ }
+ if (gpr_unref(&refcount_)) {
+ delete this;
+ }
+}
+
+#endif
diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.h b/src/core/lib/iomgr/cfstream_handle.h
index 9e1d8fbed9..f6f7ae54ed 100644
--- a/src/core/lib/iomgr/tcp_cfstream_sync.h
+++ b/src/core/lib/iomgr/cfstream_handle.h
@@ -16,8 +16,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H
-#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H
+/* The CFStream handle acts as an event synchronization entity for
+ * read/write/open/error/eos events happening on CFStream streams. */
+
+#ifndef GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H
+#define GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H
#include <grpc/support/port_platform.h>
@@ -29,14 +32,14 @@
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/lockfree_event.h"
-class CFStreamSync final {
+class CFStreamHandle final {
public:
- static CFStreamSync* CreateStreamSync(CFReadStreamRef read_stream,
- CFWriteStreamRef write_stream);
- ~CFStreamSync();
- CFStreamSync(const CFReadStreamRef& ref) = delete;
- CFStreamSync(CFReadStreamRef&& ref) = delete;
- CFStreamSync& operator=(const CFStreamSync& rhs) = delete;
+ static CFStreamHandle* CreateStreamSync(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream);
+ ~CFStreamHandle();
+ CFStreamHandle(const CFReadStreamRef& ref) = delete;
+ CFStreamHandle(CFReadStreamRef&& ref) = delete;
+ CFStreamHandle& operator=(const CFStreamHandle& rhs) = delete;
void NotifyOnOpen(grpc_closure* closure);
void NotifyOnRead(grpc_closure* closure);
@@ -49,7 +52,7 @@ class CFStreamSync final {
const char* reason = nullptr);
private:
- CFStreamSync(CFReadStreamRef read_stream, CFWriteStreamRef write_stream);
+ CFStreamHandle(CFReadStreamRef read_stream, CFWriteStreamRef write_stream);
static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type,
void* client_callback_info);
static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type,
@@ -76,4 +79,4 @@ class CFStreamSync final {
#endif
-#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H */
+#endif /* GRPC_CORE_LIB_IOMGR_CFSTREAM_HANDLE_H */
diff --git a/src/core/lib/iomgr/endpoint_cfstream.cc b/src/core/lib/iomgr/endpoint_cfstream.cc
new file mode 100644
index 0000000000..aa4359dfb2
--- /dev/null
+++ b/src/core/lib/iomgr/endpoint_cfstream.cc
@@ -0,0 +1,372 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_ENDPOINT
+
+#import <CoreFoundation/CoreFoundation.h>
+#import "src/core/lib/iomgr/endpoint_cfstream.h"
+
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/cfstream_handle.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/error_apple.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+typedef struct {
+ grpc_endpoint base;
+ gpr_refcount refcount;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+ CFStreamHandle* stream_sync;
+
+ grpc_closure* read_cb;
+ grpc_closure* write_cb;
+ grpc_slice_buffer* read_slices;
+ grpc_slice_buffer* write_slices;
+
+ grpc_closure read_action;
+ grpc_closure write_action;
+
+ char* peer_string;
+ grpc_resource_user* resource_user;
+ grpc_resource_user_slice_allocator slice_allocator;
+} CFStreamEndpoint;
+
+static void CFStreamFree(CFStreamEndpoint* ep) {
+ grpc_resource_user_unref(ep->resource_user);
+ CFRelease(ep->read_stream);
+ CFRelease(ep->write_stream);
+ CFSTREAM_SYNC_UNREF(ep->stream_sync, "free");
+ gpr_free(ep->peer_string);
+ gpr_free(ep);
+}
+
+#ifndef NDEBUG
+#define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
+#define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
+static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
+ const char* file, int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
+ reason, val, val - 1);
+ }
+ if (gpr_unref(&ep->refcount)) {
+ CFStreamFree(ep);
+ }
+}
+static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
+ const char* file, int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
+ reason, val, val + 1);
+ }
+ gpr_ref(&ep->refcount);
+}
+#else
+#define EP_REF(ep, reason) CFStreamRef((ep))
+#define EP_UNREF(ep, reason) CFStreamUnref((ep))
+static void CFStreamUnref(CFStreamEndpoint* ep) {
+ if (gpr_unref(&ep->refcount)) {
+ CFStreamFree(ep);
+ }
+}
+static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
+#endif
+
+static grpc_error* CFStreamAnnotateError(grpc_error* src_error,
+ CFStreamEndpoint* ep) {
+ return grpc_error_set_str(
+ grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE),
+ GRPC_ERROR_STR_TARGET_ADDRESS,
+ grpc_slice_from_copied_string(ep->peer_string));
+}
+
+static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep,
+ ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg);
+ size_t i;
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+
+ for (i = 0; i < ep->read_slices->count; i++) {
+ char* dump = grpc_dump_slice(ep->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump);
+ gpr_free(dump);
+ }
+ }
+ grpc_closure* cb = ep->read_cb;
+ ep->read_cb = nullptr;
+ ep->read_slices = nullptr;
+ GRPC_CLOSURE_SCHED(cb, error);
+}
+
+static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep,
+ ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg);
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "write: error=%s", str);
+ }
+ grpc_closure* cb = ep->write_cb;
+ ep->write_cb = nullptr;
+ ep->write_slices = nullptr;
+ GRPC_CLOSURE_SCHED(cb, error);
+}
+
+static void ReadAction(void* arg, grpc_error* error) {
+ CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
+ GPR_ASSERT(ep->read_cb != nullptr);
+ if (error) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CallReadCb(ep, GRPC_ERROR_REF(error));
+ EP_UNREF(ep, "read");
+ return;
+ }
+
+ GPR_ASSERT(ep->read_slices->count == 1);
+ grpc_slice slice = ep->read_slices->slices[0];
+ size_t len = GRPC_SLICE_LENGTH(slice);
+ CFIndex read_size =
+ CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
+ if (read_size == -1) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
+ if (stream_error != nullptr) {
+ error = CFStreamAnnotateError(
+ GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep);
+ CFRelease(stream_error);
+ } else {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error");
+ }
+ CallReadCb(ep, error);
+ EP_UNREF(ep, "read");
+ } else if (read_size == 0) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CallReadCb(ep,
+ CFStreamAnnotateError(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep));
+ EP_UNREF(ep, "read");
+ } else {
+ if (read_size < len) {
+ grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
+ }
+ CallReadCb(ep, GRPC_ERROR_NONE);
+ EP_UNREF(ep, "read");
+ }
+}
+
+static void WriteAction(void* arg, grpc_error* error) {
+ CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
+ GPR_ASSERT(ep->write_cb != nullptr);
+ if (error) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
+ CallWriteCb(ep, GRPC_ERROR_REF(error));
+ EP_UNREF(ep, "write");
+ return;
+ }
+
+ grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
+ size_t slice_len = GRPC_SLICE_LENGTH(slice);
+ CFIndex write_size = CFWriteStreamWrite(
+ ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
+ if (write_size == -1) {
+ grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
+ CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
+ if (stream_error != nullptr) {
+ error = CFStreamAnnotateError(
+ GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep);
+ CFRelease(stream_error);
+ } else {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed.");
+ }
+ CallWriteCb(ep, error);
+ EP_UNREF(ep, "write");
+ } else {
+ if (write_size < GRPC_SLICE_LENGTH(slice)) {
+ grpc_slice_buffer_undo_take_first(
+ ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
+ }
+ if (ep->write_slices->length > 0) {
+ ep->stream_sync->NotifyOnWrite(&ep->write_action);
+ } else {
+ CallWriteCb(ep, GRPC_ERROR_NONE);
+ EP_UNREF(ep, "write");
+ }
+
+ if (grpc_tcp_trace.enabled()) {
+ grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
+ char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump);
+ gpr_free(dump);
+ grpc_slice_unref_internal(trace_slice);
+ }
+ }
+ grpc_slice_unref_internal(slice);
+}
+
+static void CFStreamReadAllocationDone(void* arg, grpc_error* error) {
+ CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
+ if (error == GRPC_ERROR_NONE) {
+ ep->stream_sync->NotifyOnRead(&ep->read_action);
+ } else {
+ grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
+ CallReadCb(ep, error);
+ EP_UNREF(ep, "read");
+ }
+}
+
+static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
+ slices, cb, slices->length);
+ }
+ GPR_ASSERT(ep_impl->read_cb == nullptr);
+ ep_impl->read_cb = cb;
+ ep_impl->read_slices = slices;
+ grpc_slice_buffer_reset_and_unref_internal(slices);
+ grpc_resource_user_alloc_slices(&ep_impl->slice_allocator,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
+ ep_impl->read_slices);
+ EP_REF(ep_impl, "read");
+}
+
+static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
+ ep_impl, slices, cb, slices->length);
+ }
+ GPR_ASSERT(ep_impl->write_cb == nullptr);
+ ep_impl->write_cb = cb;
+ ep_impl->write_slices = slices;
+ EP_REF(ep_impl, "write");
+ ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
+}
+
+void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why);
+ }
+ CFReadStreamClose(ep_impl->read_stream);
+ CFWriteStreamClose(ep_impl->write_stream);
+ ep_impl->stream_sync->Shutdown(why);
+ grpc_resource_user_shutdown(ep_impl->resource_user);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why);
+ }
+}
+
+void CFStreamDestroy(grpc_endpoint* ep) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
+ }
+ EP_UNREF(ep_impl, "destroy");
+}
+
+grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ return ep_impl->resource_user;
+}
+
+char* CFStreamGetPeer(grpc_endpoint* ep) {
+ CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
+ return gpr_strdup(ep_impl->peer_string);
+}
+
+int CFStreamGetFD(grpc_endpoint* ep) { return 0; }
+
+void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
+void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
+void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep,
+ grpc_pollset_set* pollset) {}
+
+static const grpc_endpoint_vtable vtable = {CFStreamRead,
+ CFStreamWrite,
+ CFStreamAddToPollset,
+ CFStreamAddToPollsetSet,
+ CFStreamDeleteFromPollsetSet,
+ CFStreamShutdown,
+ CFStreamDestroy,
+ CFStreamGetResourceUser,
+ CFStreamGetPeer,
+ CFStreamGetFD};
+
+grpc_endpoint* grpc_cfstream_endpoint_create(
+ CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
+ const char* peer_string, grpc_resource_quota* resource_quota,
+ CFStreamHandle* stream_sync) {
+ CFStreamEndpoint* ep_impl =
+ static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint)));
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG,
+ "CFStream endpoint:%p create readStream:%p writeStream: %p",
+ ep_impl, read_stream, write_stream);
+ }
+ ep_impl->base.vtable = &vtable;
+ gpr_ref_init(&ep_impl->refcount, 1);
+ ep_impl->read_stream = read_stream;
+ ep_impl->write_stream = write_stream;
+ CFRetain(read_stream);
+ CFRetain(write_stream);
+ ep_impl->stream_sync = stream_sync;
+ CFSTREAM_SYNC_REF(ep_impl->stream_sync, "endpoint create");
+
+ ep_impl->peer_string = gpr_strdup(peer_string);
+ ep_impl->read_cb = nil;
+ ep_impl->write_cb = nil;
+ ep_impl->read_slices = nil;
+ ep_impl->write_slices = nil;
+ GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
+ static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
+ static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
+ ep_impl->resource_user =
+ grpc_resource_user_create(resource_quota, peer_string);
+ grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator,
+ ep_impl->resource_user,
+ CFStreamReadAllocationDone, ep_impl);
+
+ return &ep_impl->base;
+}
+
+#endif /* GRPC_CFSTREAM_ENDPOINT */
diff --git a/src/core/lib/iomgr/tcp_cfstream.h b/src/core/lib/iomgr/endpoint_cfstream.h
index 9f52f6a5de..ef957c1f11 100644
--- a/src/core/lib/iomgr/tcp_cfstream.h
+++ b/src/core/lib/iomgr/endpoint_cfstream.h
@@ -16,8 +16,8 @@
*
*/
-#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H
-#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H
+#ifndef GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H
+#define GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H
/*
Low level TCP "bottom half" implementation, for use by transports built on
top of a TCP connection.
@@ -36,17 +36,14 @@
#import <CoreFoundation/CoreFoundation.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/cfstream_handle.h"
#include "src/core/lib/iomgr/endpoint.h"
-#include "src/core/lib/iomgr/tcp_cfstream_sync.h"
-extern grpc_core::TraceFlag grpc_tcp_trace;
-
-grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream,
- CFWriteStreamRef write_stream,
- const char* peer_string,
- grpc_resource_quota* resource_quota,
- CFStreamSync* stream_sync);
+grpc_endpoint* grpc_cfstream_endpoint_create(
+ CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
+ const char* peer_string, grpc_resource_quota* resource_quota,
+ CFStreamHandle* stream_sync);
#endif /* GRPC_CFSTREAM */
-#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_H */
+#endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_CFSTREAM_H */
diff --git a/src/core/lib/iomgr/error_apple.cc b/src/core/lib/iomgr/error_apple.cc
index 95d69ecee9..d7af8c377f 100644
--- a/src/core/lib/iomgr/error_apple.cc
+++ b/src/core/lib/iomgr/error_apple.cc
@@ -31,7 +31,8 @@
grpc_error* grpc_error_create_from_cferror(const char* file, int line,
void* arg, const char* custom_desc) {
CFErrorRef error = static_cast<CFErrorRef>(arg);
- char buf_domain[MAX_ERROR_DESCRIPTION], buf_desc[MAX_ERROR_DESCRIPTION];
+ char buf_domain[MAX_ERROR_DESCRIPTION];
+ char buf_desc[MAX_ERROR_DESCRIPTION];
char* error_msg;
CFErrorDomain domain = CFErrorGetDomain((error));
CFIndex code = CFErrorGetCode((error));
diff --git a/src/core/lib/iomgr/polling_entity.cc b/src/core/lib/iomgr/polling_entity.cc
index 9c21e1f488..1ca3d9f31f 100644
--- a/src/core/lib/iomgr/polling_entity.cc
+++ b/src/core/lib/iomgr/polling_entity.cc
@@ -62,6 +62,8 @@ void grpc_polling_entity_add_to_pollset_set(grpc_polling_entity* pollent,
grpc_pollset_set* pss_dst) {
if (pollent->tag == GRPC_POLLS_POLLSET) {
#ifdef GRPC_CFSTREAM
+ // CFStream does not use file destriptors. When CFStream is used, the fd
+ // pollset is possible to be null.
if (pollent->pollent.pollset != nullptr) {
grpc_pollset_set_add_pollset(pss_dst, pollent->pollent.pollset);
}
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index a1a029e1bf..80d8e63cdd 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -99,8 +99,8 @@
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#ifdef GRPC_CFSTREAM
#define GRPC_POSIX_SOCKET_IOMGR 1
-#define GRPC_CFSTREAM_TCP 1
-#define GRPC_CFSTREAM_TCP_CLIENT 1
+#define GRPC_CFSTREAM_ENDPOINT 1
+#define GRPC_CFSTREAM_CLIENT 1
#define GRPC_POSIX_SOCKET_ARES_EV_DRIVER 1
#define GRPC_POSIX_SOCKET_EV 1
#define GRPC_POSIX_SOCKET_EV_EPOLL1 1
diff --git a/src/core/lib/iomgr/tcp_cfstream.cc b/src/core/lib/iomgr/tcp_cfstream.cc
deleted file mode 100644
index d232e78513..0000000000
--- a/src/core/lib/iomgr/tcp_cfstream.cc
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GRPC_CFSTREAM_TCP
-
-#import <CoreFoundation/CoreFoundation.h>
-#import "src/core/lib/iomgr/tcp_cfstream.h"
-
-#include <grpc/slice_buffer.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/string_util.h>
-
-#include "src/core/lib/gpr/string.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/endpoint.h"
-#include "src/core/lib/iomgr/error_apple.h"
-#include "src/core/lib/iomgr/tcp_cfstream_sync.h"
-#include "src/core/lib/slice/slice_internal.h"
-#include "src/core/lib/slice/slice_string_helpers.h"
-
-extern grpc_core::TraceFlag grpc_tcp_trace;
-
-typedef struct {
- grpc_endpoint base;
- gpr_refcount refcount;
-
- CFReadStreamRef read_stream;
- CFWriteStreamRef write_stream;
- CFStreamSync* stream_sync;
-
- grpc_closure* read_cb;
- grpc_closure* write_cb;
- grpc_slice_buffer* read_slices;
- grpc_slice_buffer* write_slices;
-
- grpc_closure read_action;
- grpc_closure write_action;
- CFStreamEventType read_type;
-
- char* peer_string;
- grpc_resource_user* resource_user;
- grpc_resource_user_slice_allocator slice_allocator;
-} CFStreamTCP;
-
-static void TCPFree(CFStreamTCP* tcp) {
- grpc_resource_user_unref(tcp->resource_user);
- CFRelease(tcp->read_stream);
- CFRelease(tcp->write_stream);
- CFSTREAM_SYNC_UNREF(tcp->stream_sync, "free");
- gpr_free(tcp->peer_string);
- gpr_free(tcp);
-}
-
-#ifndef NDEBUG
-#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file,
- int line) {
- if (grpc_tcp_trace.enabled()) {
- gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
- val - 1);
- }
- if (gpr_unref(&tcp->refcount)) {
- TCPFree(tcp);
- }
-}
-static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file,
- int line) {
- if (grpc_tcp_trace.enabled()) {
- gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
- val + 1);
- }
- gpr_ref(&tcp->refcount);
-}
-#else
-#define TCP_REF(tcp, reason) tcp_ref((tcp))
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
-static void tcp_unref(CFStreamTCP* tcp) {
- if (gpr_unref(&tcp->refcount)) {
- TCPFree(tcp);
- }
-}
-static void tcp_ref(CFStreamTCP* tcp) { gpr_ref(&tcp->refcount); }
-#endif
-
-static grpc_error* TCPAnnotateError(grpc_error* src_error, CFStreamTCP* tcp) {
- return grpc_error_set_str(
- grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAVAILABLE),
- GRPC_ERROR_STR_TARGET_ADDRESS,
- grpc_slice_from_copied_string(tcp->peer_string));
-}
-
-static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) {
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb,
- tcp->read_cb->cb, tcp->read_cb->cb_arg);
- size_t i;
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "read: error=%s", str);
-
- for (i = 0; i < tcp->read_slices->count; i++) {
- char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
- gpr_free(dump);
- }
- }
- grpc_closure* cb = tcp->read_cb;
- tcp->read_cb = nullptr;
- tcp->read_slices = nullptr;
- GRPC_CLOSURE_SCHED(cb, error);
-}
-
-static void CallWriteCB(CFStreamTCP* tcp, grpc_error* error) {
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb,
- tcp->write_cb->cb, tcp->write_cb->cb_arg);
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "write: error=%s", str);
- }
- grpc_closure* cb = tcp->write_cb;
- tcp->write_cb = nullptr;
- tcp->write_slices = nullptr;
- GRPC_CLOSURE_SCHED(cb, error);
-}
-
-static void ReadAction(void* arg, grpc_error* error) {
- CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg);
- GPR_ASSERT(tcp->read_cb != nullptr);
- if (error) {
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
- CallReadCB(tcp, GRPC_ERROR_REF(error));
- TCP_UNREF(tcp, "read");
- return;
- }
-
- GPR_ASSERT(tcp->read_slices->count == 1);
- grpc_slice slice = tcp->read_slices->slices[0];
- size_t len = GRPC_SLICE_LENGTH(slice);
- CFIndex read_size =
- CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len);
- if (read_size == -1) {
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
- CFErrorRef stream_error = CFReadStreamCopyError(tcp->read_stream);
- if (stream_error != nullptr) {
- error = TCPAnnotateError(
- GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), tcp);
- CFRelease(stream_error);
- } else {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error");
- }
- CallReadCB(tcp, error);
- TCP_UNREF(tcp, "read");
- } else if (read_size == 0) {
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
- CallReadCB(tcp,
- TCPAnnotateError(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
- TCP_UNREF(tcp, "read");
- } else {
- if (read_size < len) {
- grpc_slice_buffer_trim_end(tcp->read_slices, len - read_size, nullptr);
- }
- CallReadCB(tcp, GRPC_ERROR_NONE);
- TCP_UNREF(tcp, "read");
- }
-}
-
-static void WriteAction(void* arg, grpc_error* error) {
- CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg);
- GPR_ASSERT(tcp->write_cb != nullptr);
- if (error) {
- grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices);
- CallWriteCB(tcp, GRPC_ERROR_REF(error));
- TCP_UNREF(tcp, "write");
- return;
- }
-
- grpc_slice slice = grpc_slice_buffer_take_first(tcp->write_slices);
- size_t slice_len = GRPC_SLICE_LENGTH(slice);
- CFIndex write_size = CFWriteStreamWrite(
- tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
- if (write_size == -1) {
- grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices);
- CFErrorRef stream_error = CFWriteStreamCopyError(tcp->write_stream);
- if (stream_error != nullptr) {
- error = TCPAnnotateError(
- GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), tcp);
- CFRelease(stream_error);
- } else {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed.");
- }
- CallWriteCB(tcp, error);
- TCP_UNREF(tcp, "write");
- } else {
- if (write_size < GRPC_SLICE_LENGTH(slice)) {
- grpc_slice_buffer_undo_take_first(
- tcp->write_slices, grpc_slice_sub(slice, write_size, slice_len));
- }
- if (tcp->write_slices->length > 0) {
- tcp->stream_sync->NotifyOnWrite(&tcp->write_action);
- } else {
- CallWriteCB(tcp, GRPC_ERROR_NONE);
- TCP_UNREF(tcp, "write");
- }
-
- if (grpc_tcp_trace.enabled()) {
- grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
- char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, dump);
- gpr_free(dump);
- grpc_slice_unref_internal(trace_slice);
- }
- }
- grpc_slice_unref_internal(slice);
-}
-
-static void TCPReadAllocationDone(void* arg, grpc_error* error) {
- CFStreamTCP* tcp = static_cast<CFStreamTCP*>(arg);
- if (error == GRPC_ERROR_NONE) {
- tcp->stream_sync->NotifyOnRead(&tcp->read_action);
- } else {
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
- CallReadCB(tcp, error);
- TCP_UNREF(tcp, "read");
- }
-}
-
-static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
- CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb,
- slices->length);
- }
- GPR_ASSERT(tcp->read_cb == nullptr);
- tcp->read_cb = cb;
- tcp->read_slices = slices;
- grpc_slice_buffer_reset_and_unref_internal(slices);
- grpc_resource_user_alloc_slices(&tcp->slice_allocator,
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
- tcp->read_slices);
- TCP_REF(tcp, "read");
-}
-
-static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
- CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb,
- slices->length);
- }
- GPR_ASSERT(tcp->write_cb == nullptr);
- tcp->write_cb = cb;
- tcp->write_slices = slices;
- TCP_REF(tcp, "write");
- tcp->stream_sync->NotifyOnWrite(&tcp->write_action);
-}
-
-void TCPShutdown(grpc_endpoint* ep, grpc_error* why) {
- CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "tcp:%p shutdown (%p)", tcp, why);
- }
- CFReadStreamClose(tcp->read_stream);
- CFWriteStreamClose(tcp->write_stream);
- tcp->stream_sync->Shutdown(why);
- grpc_resource_user_shutdown(tcp->resource_user);
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "tcp:%p shutdown DONE (%p)", tcp, why);
- }
-}
-
-void TCPDestroy(grpc_endpoint* ep) {
- CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "tcp:%p destroy", tcp);
- }
- TCP_UNREF(tcp, "destroy");
-}
-
-grpc_resource_user* TCPGetResourceUser(grpc_endpoint* ep) {
- CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
- return tcp->resource_user;
-}
-
-char* TCPGetPeer(grpc_endpoint* ep) {
- CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
- return gpr_strdup(tcp->peer_string);
-}
-
-int TCPGetFD(grpc_endpoint* ep) { return 0; }
-
-void TCPAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
-void TCPAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
-void TCPDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
-
-static const grpc_endpoint_vtable vtable = {TCPRead,
- TCPWrite,
- TCPAddToPollset,
- TCPAddToPollsetSet,
- TCPDeleteFromPollsetSet,
- TCPShutdown,
- TCPDestroy,
- TCPGetResourceUser,
- TCPGetPeer,
- TCPGetFD};
-
-grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream,
- CFWriteStreamRef write_stream,
- const char* peer_string,
- grpc_resource_quota* resource_quota,
- CFStreamSync* stream_sync) {
- CFStreamTCP* tcp = static_cast<CFStreamTCP*>(gpr_malloc(sizeof(CFStreamTCP)));
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp,
- read_stream, write_stream);
- }
- tcp->base.vtable = &vtable;
- gpr_ref_init(&tcp->refcount, 1);
- tcp->read_stream = read_stream;
- tcp->write_stream = write_stream;
- CFRetain(read_stream);
- CFRetain(write_stream);
- tcp->stream_sync = stream_sync;
- CFSTREAM_SYNC_REF(tcp->stream_sync, "endpoint create");
-
- tcp->peer_string = gpr_strdup(peer_string);
- tcp->read_cb = nil;
- tcp->write_cb = nil;
- tcp->read_slices = nil;
- tcp->write_slices = nil;
- GRPC_CLOSURE_INIT(&tcp->read_action, ReadAction, static_cast<void*>(tcp),
- grpc_schedule_on_exec_ctx);
- GRPC_CLOSURE_INIT(&tcp->write_action, WriteAction, static_cast<void*>(tcp),
- grpc_schedule_on_exec_ctx);
- tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
- grpc_resource_user_slice_allocator_init(
- &tcp->slice_allocator, tcp->resource_user, TCPReadAllocationDone, tcp);
-
- return &tcp->base;
-}
-
-#endif /* GRPC_CFSTREAM_TCP */
diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.cc b/src/core/lib/iomgr/tcp_cfstream_sync.cc
deleted file mode 100644
index 07ee53311c..0000000000
--- a/src/core/lib/iomgr/tcp_cfstream_sync.cc
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GRPC_CFSTREAM
-#import <CoreFoundation/CoreFoundation.h>
-#import "src/core/lib/iomgr/tcp_cfstream_sync.h"
-
-#include <grpc/support/atm.h>
-#include <grpc/support/sync.h>
-
-#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
-
-extern grpc_core::TraceFlag grpc_tcp_trace;
-
-void* CFStreamSync::Retain(void* info) {
- CFStreamSync* sync = static_cast<CFStreamSync*>(info);
- CFSTREAM_SYNC_REF(sync, "retain");
- return info;
-}
-
-void CFStreamSync::Release(void* info) {
- CFStreamSync* sync = static_cast<CFStreamSync*>(info);
- CFSTREAM_SYNC_UNREF(sync, "release");
-}
-
-CFStreamSync* CFStreamSync::CreateStreamSync(CFReadStreamRef read_stream,
- CFWriteStreamRef write_stream) {
- return new CFStreamSync(read_stream, write_stream);
-}
-
-void CFStreamSync::ReadCallback(CFReadStreamRef stream, CFStreamEventType type,
- void* client_callback_info) {
- CFStreamSync* sync = static_cast<CFStreamSync*>(client_callback_info);
- CFSTREAM_SYNC_REF(sync, "read callback");
- dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
- ^{
- grpc_core::ExecCtx exec_ctx;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP ReadCallback (%p, %p, %lu, %p)",
- sync, stream, type, client_callback_info);
- }
- switch (type) {
- case kCFStreamEventOpenCompleted:
- sync->open_event_.SetReady();
- break;
- case kCFStreamEventHasBytesAvailable:
- case kCFStreamEventEndEncountered:
- sync->read_event_.SetReady();
- break;
- case kCFStreamEventErrorOccurred:
- sync->open_event_.SetReady();
- sync->read_event_.SetReady();
- break;
- default:
- // Impossible
- abort();
- }
- CFSTREAM_SYNC_UNREF(sync, "read callback");
- });
-}
-void CFStreamSync::WriteCallback(CFWriteStreamRef stream,
- CFStreamEventType type,
- void* clientCallBackInfo) {
- CFStreamSync* sync = static_cast<CFStreamSync*>(clientCallBackInfo);
- CFSTREAM_SYNC_REF(sync, "write callback");
- dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
- ^{
- grpc_core::ExecCtx exec_ctx;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP WriteCallback (%p, %p, %lu, %p)",
- sync, stream, type, clientCallBackInfo);
- }
- switch (type) {
- case kCFStreamEventOpenCompleted:
- sync->open_event_.SetReady();
- break;
- case kCFStreamEventCanAcceptBytes:
- case kCFStreamEventEndEncountered:
- sync->write_event_.SetReady();
- break;
- case kCFStreamEventErrorOccurred:
- sync->open_event_.SetReady();
- sync->write_event_.SetReady();
- break;
- default:
- // Impossible
- abort();
- }
- CFSTREAM_SYNC_UNREF(sync, "write callback");
- });
-}
-
-CFStreamSync::CFStreamSync(CFReadStreamRef read_stream,
- CFWriteStreamRef write_stream) {
- gpr_ref_init(&refcount_, 1);
- open_event_.InitEvent();
- read_event_.InitEvent();
- write_event_.InitEvent();
- CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil};
- CFReadStreamSetClient(
- read_stream,
- kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
- kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
- CFStreamSync::ReadCallback, &ctx);
- CFWriteStreamSetClient(
- write_stream,
- kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
- kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
- CFStreamSync::WriteCallback, &ctx);
- CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(),
- kCFRunLoopCommonModes);
- CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(),
- kCFRunLoopCommonModes);
-}
-
-CFStreamSync::~CFStreamSync() {
- open_event_.DestroyEvent();
- read_event_.DestroyEvent();
- write_event_.DestroyEvent();
-}
-
-void CFStreamSync::NotifyOnOpen(grpc_closure* closure) {
- open_event_.NotifyOn(closure);
-}
-
-void CFStreamSync::NotifyOnRead(grpc_closure* closure) {
- read_event_.NotifyOn(closure);
-}
-
-void CFStreamSync::NotifyOnWrite(grpc_closure* closure) {
- write_event_.NotifyOn(closure);
-}
-
-void CFStreamSync::Shutdown(grpc_error* error) {
- open_event_.SetShutdown(GRPC_ERROR_REF(error));
- read_event_.SetShutdown(GRPC_ERROR_REF(error));
- write_event_.SetShutdown(GRPC_ERROR_REF(error));
- GRPC_ERROR_UNREF(error);
-}
-
-void CFStreamSync::Ref(const char* file, int line, const char* reason) {
- if (grpc_tcp_trace.enabled()) {
- gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "TCP SYNC ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val,
- val + 1);
- }
- gpr_ref(&refcount_);
-}
-
-void CFStreamSync::Unref(const char* file, int line, const char* reason) {
- if (grpc_tcp_trace.enabled()) {
- gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
- gpr_log(GPR_ERROR, "TCP SYNC unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
- reason, val, val - 1);
- }
- if (gpr_unref(&refcount_)) {
- delete this;
- }
-}
-
-#endif
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
index 7deaece904..552f81a506 100644
--- a/src/core/lib/iomgr/tcp_client_cfstream.cc
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -21,7 +21,7 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_CFSTREAM_TCP_CLIENT
+#ifdef GRPC_CFSTREAM_CLIENT
#include <CoreFoundation/CoreFoundation.h>
@@ -35,24 +35,24 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/gpr/host_port.h"
+#include "src/core/lib/iomgr/cfstream_handle.h"
#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/endpoint_cfstream.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/error_apple.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/iomgr/tcp_cfstream.h"
-#include "src/core/lib/iomgr/tcp_cfstream_sync.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
extern grpc_core::TraceFlag grpc_tcp_trace;
-typedef struct CFStreamTCPConnect {
+typedef struct CFStreamConnect {
gpr_mu mu;
gpr_refcount refcount;
CFReadStreamRef read_stream;
CFWriteStreamRef write_stream;
- CFStreamSync* stream_sync;
+ CFStreamHandle* stream_sync;
grpc_timer alarm;
grpc_closure on_alarm;
@@ -67,9 +67,9 @@ typedef struct CFStreamTCPConnect {
int refs;
char* addr_name;
grpc_resource_quota* resource_quota;
-} CFStreamTCPConnect;
+} CFStreamConnect;
-static void TCPConnectCleanup(CFStreamTCPConnect* connect) {
+static void CFStreamConnectCleanup(CFStreamConnect* connect) {
grpc_resource_quota_unref_internal(connect->resource_quota);
CFSTREAM_SYNC_UNREF(connect->stream_sync, "async connect clean up");
CFRelease(connect->read_stream);
@@ -80,7 +80,7 @@ static void TCPConnectCleanup(CFStreamTCPConnect* connect) {
}
static void OnAlarm(void* arg, grpc_error* error) {
- CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg);
+ CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error);
}
@@ -92,7 +92,7 @@ static void OnAlarm(void* arg, grpc_error* error) {
// Only schedule a callback once, by either on_timer or on_connected. The
// first one issues callback while the second one does cleanup.
if (done) {
- TCPConnectCleanup(connect);
+ CFStreamConnectCleanup(connect);
} else {
grpc_error* error =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out");
@@ -101,7 +101,7 @@ static void OnAlarm(void* arg, grpc_error* error) {
}
static void OnOpen(void* arg, grpc_error* error) {
- CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg);
+ CFStreamConnect* connect = static_cast<CFStreamConnect*>(arg);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error);
}
@@ -115,7 +115,7 @@ static void OnOpen(void* arg, grpc_error* error) {
if (done) {
gpr_mu_unlock(&connect->mu);
- TCPConnectCleanup(connect);
+ CFStreamConnectCleanup(connect);
} else {
if (error == GRPC_ERROR_NONE) {
CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream);
@@ -127,9 +127,9 @@ static void OnOpen(void* arg, grpc_error* error) {
CFRelease(stream_error);
}
if (error == GRPC_ERROR_NONE) {
- *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream,
- connect->addr_name, connect->resource_quota,
- connect->stream_sync);
+ *endpoint = grpc_cfstream_endpoint_create(
+ connect->read_stream, connect->write_stream, connect->addr_name,
+ connect->resource_quota, connect->stream_sync);
}
}
gpr_mu_unlock(&connect->mu);
@@ -149,14 +149,14 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr,
*port = grpc_sockaddr_get_port(addr);
}
-static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties,
- const grpc_channel_args* channel_args,
- const grpc_resolved_address* resolved_addr,
- grpc_millis deadline) {
- CFStreamTCPConnect* connect;
+static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* resolved_addr,
+ grpc_millis deadline) {
+ CFStreamConnect* connect;
- connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect));
+ connect = (CFStreamConnect*)gpr_zalloc(sizeof(CFStreamConnect));
connect->closure = closure;
connect->endpoint = ep;
connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
@@ -194,7 +194,7 @@ static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep,
connect->read_stream = read_stream;
connect->write_stream = write_stream;
connect->stream_sync =
- CFStreamSync::CreateStreamSync(read_stream, write_stream);
+ CFStreamHandle::CreateStreamSync(read_stream, write_stream);
GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
grpc_schedule_on_exec_ctx);
connect->stream_sync->NotifyOnOpen(&connect->on_open);
@@ -207,6 +207,6 @@ static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep,
gpr_mu_unlock(&connect->mu);
}
-grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {TCPClientConnect};
+grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {CFStreamClientConnect};
-#endif /* GRPC_CFSTREAM_TCP_CLIENT */
+#endif /* GRPC_CFSTREAM_CLIENT */