diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/tcp_cfstream_sync.h | 79 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_cfstream_sync.mm | 159 |
2 files changed, 238 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.h b/src/core/lib/iomgr/tcp_cfstream_sync.h new file mode 100644 index 0000000000..06e225485f --- /dev/null +++ b/src/core/lib/iomgr/tcp_cfstream_sync.h @@ -0,0 +1,79 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H +#define GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H + +#include <grpc/support/port_platform.h> + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM +#import <Foundation/Foundation.h> + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/lockfree_event.h" + +class CFStreamSync final { + public: + static CFStreamSync* CreateStreamSync(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream); + ~CFStreamSync() {} + CFStreamSync(const CFReadStreamRef& ref) = delete; + CFStreamSync(CFReadStreamRef&& ref) = delete; + CFStreamSync& operator=(const CFStreamSync& rhs) = delete; + + void NotifyOnOpen(grpc_closure* closure); + void NotifyOnRead(grpc_closure* closure); + void NotifyOnWrite(grpc_closure* closure); + void Shutdown(grpc_error* error); + + void Ref(const char* file = nullptr, int line = 0, + const char* reason = nullptr); + void Unref(const char* file = nullptr, int line = 0, + const char* reason = nullptr); + + private: + CFStreamSync(CFReadStreamRef read_stream, CFWriteStreamRef write_stream); + static void ReadCallback(CFReadStreamRef stream, CFStreamEventType type, + void* client_callback_info); + static void WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, + void* client_callback_info); + static void* Retain(void* info); + static void Release(void* info); + + grpc_core::LockfreeEvent open_event_; + grpc_core::LockfreeEvent read_event_; + grpc_core::LockfreeEvent write_event_; + + gpr_refcount refcount_; +}; + +#ifndef NDEBUG +#define CFSTREAM_SYNC_REF(sync, reason) \ + (sync)->Ref(__FILE__, __LINE__, (reason)) +#define CFSTREAM_SYNC_UNREF(sync, reason) \ + (sync)->Unref(__FILE__, __LINE__, (reason)) +#else +#define CFSTREAM_SYNC_REF(sync, reason) (sync)->Ref() +#define CFSTREAM_SYNC_UNREF(sync, reason) (sync)->Unref() +#endif + +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_CFSTREAM_SYNC_H */ diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.mm b/src/core/lib/iomgr/tcp_cfstream_sync.mm new file mode 100644 index 0000000000..e2b10f927b --- /dev/null +++ b/src/core/lib/iomgr/tcp_cfstream_sync.mm @@ -0,0 +1,159 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM +#import <Foundation/Foundation.h> +#import "src/core/lib/iomgr/tcp_cfstream_sync.h" + +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +void* CFStreamSync::Retain(void* info) { + CFStreamSync* sync = static_cast<CFStreamSync*>(info); + CFSTREAM_SYNC_REF(sync, "retain"); + return info; +} + +void CFStreamSync::Release(void* info) { + CFStreamSync* sync = static_cast<CFStreamSync*>(info); + CFSTREAM_SYNC_UNREF(sync, "release"); +} + +CFStreamSync* CFStreamSync::CreateStreamSync(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream) { + return new CFStreamSync(read_stream, write_stream); +} + +void CFStreamSync::ReadCallback(CFReadStreamRef stream, CFStreamEventType type, + void* client_callback_info) { + CFStreamSync* sync = static_cast<CFStreamSync*>(client_callback_info); + CFSTREAM_SYNC_REF(sync, "read callback"); + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP ReadCallback (%p, %lu, %p)", stream, type, client_callback_info); + } + switch (type) { + case kCFStreamEventOpenCompleted: + sync->open_event_.SetReady(); + break; + case kCFStreamEventHasBytesAvailable: + case kCFStreamEventEndEncountered: + sync->read_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + sync->open_event_.SetReady(); + sync->read_event_.SetReady(); + break; + default: + // Impossible + abort(); + } + CFSTREAM_SYNC_UNREF(sync, "read callback"); + }); +} +void CFStreamSync::WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, + void* clientCallBackInfo) { + CFStreamSync* sync = static_cast<CFStreamSync*>(clientCallBackInfo); + CFSTREAM_SYNC_REF(sync, "write callback"); + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP WriteCallback (%p, %lu, %p)", stream, type, clientCallBackInfo); + } + switch (type) { + case kCFStreamEventOpenCompleted: + sync->open_event_.SetReady(); + break; + case kCFStreamEventCanAcceptBytes: + case kCFStreamEventEndEncountered: + sync->write_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + sync->open_event_.SetReady(); + sync->write_event_.SetReady(); + break; + default: + // Impossible + abort(); + } + CFSTREAM_SYNC_UNREF(sync, "write callback"); + }); +} + +CFStreamSync::CFStreamSync(CFReadStreamRef read_stream, CFWriteStreamRef write_stream) { + gpr_ref_init(&refcount_, 1); + open_event_.InitEvent(); + read_event_.InitEvent(); + write_event_.InitEvent(); + CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil}; + CFReadStreamSetClient(read_stream, + kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamSync::ReadCallback, &ctx); + CFWriteStreamSetClient(write_stream, + kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamSync::WriteCallback, &ctx); + CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), kCFRunLoopCommonModes); + CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), kCFRunLoopCommonModes); +} + +void CFStreamSync::NotifyOnOpen(grpc_closure* closure) { open_event_.NotifyOn(closure); } + +void CFStreamSync::NotifyOnRead(grpc_closure* closure) { read_event_.NotifyOn(closure); } + +void CFStreamSync::NotifyOnWrite(grpc_closure* closure) { write_event_.NotifyOn(closure); } + +void CFStreamSync::Shutdown(grpc_error* error) { + open_event_.SetShutdown(error); + read_event_.SetShutdown(error); + write_event_.SetShutdown(error); + GRPC_ERROR_UNREF(error); +} + +void CFStreamSync::Ref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP SYNC ref %p : %s %" PRIdPTR " -> %" PRIdPTR, + this, reason, val, val + 1); + } + gpr_ref(&refcount_); +} + +void CFStreamSync::Unref(const char* file, int line, const char* reason) { + if (grpc_tcp_trace.enabled()) { + gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); + gpr_log(GPR_ERROR, "TCP SYNC unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val, + val - 1); + } + if (gpr_unref(&refcount_)) { + delete this; + } +} + +#endif |