/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/lib/iomgr/port.h" #ifdef GRPC_CFSTREAM_ENDPOINT #import #import "src/core/lib/iomgr/endpoint_cfstream.h" #include #include #include #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/cfstream_handle.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/error_cfstream.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" extern grpc_core::TraceFlag grpc_tcp_trace; typedef struct { grpc_endpoint base; gpr_refcount refcount; CFReadStreamRef read_stream; CFWriteStreamRef write_stream; CFStreamHandle* stream_sync; grpc_closure* read_cb; grpc_closure* write_cb; grpc_slice_buffer* read_slices; grpc_slice_buffer* write_slices; grpc_closure read_action; grpc_closure write_action; char* peer_string; grpc_resource_user* resource_user; grpc_resource_user_slice_allocator slice_allocator; } CFStreamEndpoint; static void CFStreamFree(CFStreamEndpoint* ep) { grpc_resource_user_unref(ep->resource_user); CFRelease(ep->read_stream); CFRelease(ep->write_stream); CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free"); gpr_free(ep->peer_string); gpr_free(ep); } #ifndef NDEBUG #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__) #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__) static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason, const char* file, int line) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val, val - 1); } if (gpr_unref(&ep->refcount)) { CFStreamFree(ep); } } static void CFStreamRef(CFStreamEndpoint* ep, const char* reason, const char* file, int line) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val, val + 1); } gpr_ref(&ep->refcount); } #else #define EP_REF(ep, reason) CFStreamRef((ep)) #define EP_UNREF(ep, reason) CFStreamUnref((ep)) static void CFStreamUnref(CFStreamEndpoint* ep) { if (gpr_unref(&ep->refcount)) { CFStreamFree(ep); } } static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); } #endif static grpc_error* CFStreamAnnotateError(grpc_error* src_error, CFStreamEndpoint* ep) { return grpc_error_set_str( grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(ep->peer_string)); } static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep, ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg); size_t i; const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "read: error=%s", str); for (i = 0; i < ep->read_slices->count; i++) { char* dump = grpc_dump_slice(ep->read_slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump); gpr_free(dump); } } grpc_closure* cb = ep->read_cb; ep->read_cb = nullptr; ep->read_slices = nullptr; GRPC_CLOSURE_SCHED(cb, error); } static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep, ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg); const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: error=%s", str); } grpc_closure* cb = ep->write_cb; ep->write_cb = nullptr; ep->write_slices = nullptr; GRPC_CLOSURE_SCHED(cb, error); } static void ReadAction(void* arg, grpc_error* error) { CFStreamEndpoint* ep = static_cast(arg); GPR_ASSERT(ep->read_cb != nullptr); if (error) { grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); CallReadCb(ep, GRPC_ERROR_REF(error)); EP_UNREF(ep, "read"); return; } GPR_ASSERT(ep->read_slices->count == 1); grpc_slice slice = ep->read_slices->slices[0]; size_t len = GRPC_SLICE_LENGTH(slice); CFIndex read_size = CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len); if (read_size == -1) { grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream); if (stream_error != nullptr) { error = CFStreamAnnotateError( GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep); CFRelease(stream_error); } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error"); } CallReadCb(ep, error); EP_UNREF(ep, "read"); } else if (read_size == 0) { grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); CallReadCb(ep, CFStreamAnnotateError( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep)); EP_UNREF(ep, "read"); } else { if (read_size < len) { grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr); } CallReadCb(ep, GRPC_ERROR_NONE); EP_UNREF(ep, "read"); } } static void WriteAction(void* arg, grpc_error* error) { CFStreamEndpoint* ep = static_cast(arg); GPR_ASSERT(ep->write_cb != nullptr); if (error) { grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); CallWriteCb(ep, GRPC_ERROR_REF(error)); EP_UNREF(ep, "write"); return; } grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices); size_t slice_len = GRPC_SLICE_LENGTH(slice); CFIndex write_size = CFWriteStreamWrite( ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); if (write_size == -1) { grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream); if (stream_error != nullptr) { error = CFStreamAnnotateError( GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep); CFRelease(stream_error); } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed."); } CallWriteCb(ep, error); EP_UNREF(ep, "write"); } else { if (write_size < GRPC_SLICE_LENGTH(slice)) { grpc_slice_buffer_undo_take_first( ep->write_slices, grpc_slice_sub(slice, write_size, slice_len)); } if (ep->write_slices->length > 0) { ep->stream_sync->NotifyOnWrite(&ep->write_action); } else { CallWriteCb(ep, GRPC_ERROR_NONE); EP_UNREF(ep, "write"); } if (grpc_tcp_trace.enabled()) { grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size); char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump); gpr_free(dump); grpc_slice_unref_internal(trace_slice); } } grpc_slice_unref_internal(slice); } static void CFStreamReadAllocationDone(void* arg, grpc_error* error) { CFStreamEndpoint* ep = static_cast(arg); if (error == GRPC_ERROR_NONE) { ep->stream_sync->NotifyOnRead(&ep->read_action); } else { grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); CallReadCb(ep, error); EP_UNREF(ep, "read"); } } static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl, slices, cb, slices->length); } GPR_ASSERT(ep_impl->read_cb == nullptr); ep_impl->read_cb = cb; ep_impl->read_slices = slices; grpc_slice_buffer_reset_and_unref_internal(slices); grpc_resource_user_alloc_slices(&ep_impl->slice_allocator, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, ep_impl->read_slices); EP_REF(ep_impl, "read"); } static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, void* arg) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu", ep_impl, slices, cb, slices->length); } GPR_ASSERT(ep_impl->write_cb == nullptr); ep_impl->write_cb = cb; ep_impl->write_slices = slices; EP_REF(ep_impl, "write"); ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action); } void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why); } CFReadStreamClose(ep_impl->read_stream); CFWriteStreamClose(ep_impl->write_stream); ep_impl->stream_sync->Shutdown(why); grpc_resource_user_shutdown(ep_impl->resource_user); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why); } } void CFStreamDestroy(grpc_endpoint* ep) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl); } EP_UNREF(ep_impl, "destroy"); } grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); return ep_impl->resource_user; } char* CFStreamGetPeer(grpc_endpoint* ep) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); return gpr_strdup(ep_impl->peer_string); } int CFStreamGetFD(grpc_endpoint* ep) { return 0; } bool CFStreamCanTrackErr(grpc_endpoint* ep) { return false; } void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} static const grpc_endpoint_vtable vtable = {CFStreamRead, CFStreamWrite, CFStreamAddToPollset, CFStreamAddToPollsetSet, CFStreamDeleteFromPollsetSet, CFStreamShutdown, CFStreamDestroy, CFStreamGetResourceUser, CFStreamGetPeer, CFStreamGetFD, CFStreamCanTrackErr}; grpc_endpoint* grpc_cfstream_endpoint_create( CFReadStreamRef read_stream, CFWriteStreamRef write_stream, const char* peer_string, grpc_resource_quota* resource_quota, CFStreamHandle* stream_sync) { CFStreamEndpoint* ep_impl = static_cast(gpr_malloc(sizeof(CFStreamEndpoint))); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p create readStream:%p writeStream: %p", ep_impl, read_stream, write_stream); } ep_impl->base.vtable = &vtable; gpr_ref_init(&ep_impl->refcount, 1); ep_impl->read_stream = read_stream; ep_impl->write_stream = write_stream; CFRetain(read_stream); CFRetain(write_stream); ep_impl->stream_sync = stream_sync; CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create"); ep_impl->peer_string = gpr_strdup(peer_string); ep_impl->read_cb = nil; ep_impl->write_cb = nil; ep_impl->read_slices = nil; ep_impl->write_slices = nil; GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction, static_cast(ep_impl), grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction, static_cast(ep_impl), grpc_schedule_on_exec_ctx); ep_impl->resource_user = grpc_resource_user_create(resource_quota, peer_string); grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator, ep_impl->resource_user, CFStreamReadAllocationDone, ep_impl); return &ep_impl->base; } #endif /* GRPC_CFSTREAM_ENDPOINT */