aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/iomgr/error_apple.cc18
-rw-r--r--src/core/lib/iomgr/tcp_cfstream.cc107
-rw-r--r--src/core/lib/iomgr/tcp_cfstream_sync.cc148
-rw-r--r--src/core/lib/iomgr/tcp_client_cfstream.cc31
-rwxr-xr-xtools/run_tests/sanity/core_banned_functions.py2
5 files changed, 181 insertions, 125 deletions
diff --git a/src/core/lib/iomgr/error_apple.cc b/src/core/lib/iomgr/error_apple.cc
index 42ab6ad4b8..95d69ecee9 100644
--- a/src/core/lib/iomgr/error_apple.cc
+++ b/src/core/lib/iomgr/error_apple.cc
@@ -28,21 +28,23 @@
#define MAX_ERROR_DESCRIPTION 256
-grpc_error* grpc_error_create_from_cferror(const char* file, int line, void* arg,
- const char* custom_desc) {
+grpc_error* grpc_error_create_from_cferror(const char* file, int line,
+ void* arg, const char* custom_desc) {
CFErrorRef error = static_cast<CFErrorRef>(arg);
char buf_domain[MAX_ERROR_DESCRIPTION], buf_desc[MAX_ERROR_DESCRIPTION];
char* error_msg;
CFErrorDomain domain = CFErrorGetDomain((error));
CFIndex code = CFErrorGetCode((error));
CFStringRef desc = CFErrorCopyDescription((error));
- CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION, kCFStringEncodingUTF8);
- CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION, kCFStringEncodingUTF8);
- gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)", custom_desc,
- buf_domain, code, buf_desc);
+ CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION,
+ kCFStringEncodingUTF8);
+ CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION,
+ kCFStringEncodingUTF8);
+ gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)",
+ custom_desc, buf_domain, code, buf_desc);
CFRelease(desc);
- grpc_error* return_error =
- grpc_error_create(file, line, grpc_slice_from_copied_string(error_msg), NULL, 0);
+ grpc_error* return_error = grpc_error_create(
+ file, line, grpc_slice_from_copied_string(error_msg), NULL, 0);
gpr_free(error_msg);
return return_error;
}
diff --git a/src/core/lib/iomgr/tcp_cfstream.cc b/src/core/lib/iomgr/tcp_cfstream.cc
index e21c983671..f7ef03d228 100644
--- a/src/core/lib/iomgr/tcp_cfstream.cc
+++ b/src/core/lib/iomgr/tcp_cfstream.cc
@@ -17,6 +17,7 @@
*/
#include <grpc/support/port_platform.h>
+
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_CFSTREAM_TCP
@@ -72,21 +73,25 @@ static void TCPFree(CFStreamTCP* tcp) {
#ifndef NDEBUG
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file, int line) {
+static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file,
+ int line) {
if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp,
- reason, val, val - 1);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
+ val - 1);
}
if (gpr_unref(&tcp->refcount)) {
TCPFree(tcp);
}
}
-static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file, int line) {
+static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file,
+ int line) {
if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp,
- reason, val, val + 1);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
+ val + 1);
}
gpr_ref(&tcp->refcount);
}
@@ -103,20 +108,23 @@ static void tcp_ref(CFStreamTCP* tcp) { gpr_ref(&tcp->refcount); }
static grpc_error* TCPAnnotateError(grpc_error* src_error, CFStreamTCP* tcp) {
return grpc_error_set_str(
- grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
- GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(tcp->peer_string));
+ grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE),
+ GRPC_ERROR_STR_TARGET_ADDRESS,
+ grpc_slice_from_copied_string(tcp->peer_string));
}
static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) {
if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb, tcp->read_cb->cb,
- tcp->read_cb->cb_arg);
+ gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb,
+ tcp->read_cb->cb, tcp->read_cb->cb_arg);
size_t i;
const char* str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "read: error=%s", str);
for (i = 0; i < tcp->read_slices->count; i++) {
- char* dump = grpc_dump_slice(tcp->read_slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
gpr_free(dump);
}
@@ -129,8 +137,8 @@ static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) {
static void CallWriteCB(CFStreamTCP* tcp, grpc_error* error) {
if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb, tcp->write_cb->cb,
- tcp->write_cb->cb_arg);
+ gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb,
+ tcp->write_cb->cb, tcp->write_cb->cb_arg);
const char* str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "write: error=%s", str);
}
@@ -153,17 +161,21 @@ static void ReadAction(void* arg, grpc_error* error) {
GPR_ASSERT(tcp->read_slices->count == 1);
grpc_slice slice = tcp->read_slices->slices[0];
size_t len = GRPC_SLICE_LENGTH(slice);
- CFIndex read_size = CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len);
+ CFIndex read_size =
+ CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len);
if (read_size == -1) {
grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
CFErrorRef stream_error = CFReadStreamCopyError(tcp->read_stream);
- CallReadCB(tcp,
- TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), tcp));
+ CallReadCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(
+ stream_error, "Read error"),
+ tcp));
CFRelease(stream_error);
TCP_UNREF(tcp, "read");
} else if (read_size == 0) {
grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
- CallReadCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
+ CallReadCB(tcp,
+ TCPAnnotateError(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
TCP_UNREF(tcp, "read");
} else {
if (read_size < len) {
@@ -186,19 +198,20 @@ static void WriteAction(void* arg, grpc_error* error) {
grpc_slice slice = grpc_slice_buffer_take_first(tcp->write_slices);
size_t slice_len = GRPC_SLICE_LENGTH(slice);
- CFIndex write_size =
- CFWriteStreamWrite(tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
+ CFIndex write_size = CFWriteStreamWrite(
+ tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
if (write_size == -1) {
grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices);
CFErrorRef stream_error = CFWriteStreamCopyError(tcp->write_stream);
- CallWriteCB(
- tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), tcp));
+ CallWriteCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(
+ stream_error, "write failed."),
+ tcp));
CFRelease(stream_error);
TCP_UNREF(tcp, "write");
} else {
if (write_size < GRPC_SLICE_LENGTH(slice)) {
- grpc_slice_buffer_undo_take_first(tcp->write_slices,
- grpc_slice_sub(slice, write_size, slice_len));
+ grpc_slice_buffer_undo_take_first(
+ tcp->write_slices, grpc_slice_sub(slice, write_size, slice_len));
}
if (tcp->write_slices->length > 0) {
tcp->stream_sync->NotifyOnWrite(&tcp->write_action);
@@ -212,10 +225,10 @@ static void WriteAction(void* arg, grpc_error* error) {
char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, dump);
gpr_free(dump);
- grpc_slice_unref(trace_slice);
+ grpc_slice_unref_internal(trace_slice);
}
}
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
}
static void TCPReadAllocationDone(void* arg, grpc_error* error) {
@@ -229,24 +242,29 @@ static void TCPReadAllocationDone(void* arg, grpc_error* error) {
}
}
-static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb) {
+static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb, slices->length);
+ gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb,
+ slices->length);
}
GPR_ASSERT(tcp->read_cb == nullptr);
tcp->read_cb = cb;
tcp->read_slices = slices;
grpc_slice_buffer_reset_and_unref_internal(slices);
- grpc_resource_user_alloc_slices(&tcp->slice_allocator, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
+ grpc_resource_user_alloc_slices(&tcp->slice_allocator,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
tcp->read_slices);
TCP_REF(tcp, "read");
}
-static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb) {
+static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
+ grpc_closure* cb) {
CFStreamTCP* tcp = reinterpret_cast<CFStreamTCP*>(ep);
if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb, slices->length);
+ gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb,
+ slices->length);
}
GPR_ASSERT(tcp->write_cb == nullptr);
tcp->write_cb = cb;
@@ -290,17 +308,26 @@ void TCPAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
void TCPAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
void TCPDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
-static const grpc_endpoint_vtable vtable = {
- TCPRead, TCPWrite, TCPAddToPollset, TCPAddToPollsetSet, TCPDeleteFromPollsetSet,
- TCPShutdown, TCPDestroy, TCPGetResourceUser, TCPGetPeer, TCPGetFD};
-
-grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
- const char* peer_string, grpc_resource_quota* resource_quota,
+static const grpc_endpoint_vtable vtable = {TCPRead,
+ TCPWrite,
+ TCPAddToPollset,
+ TCPAddToPollsetSet,
+ TCPDeleteFromPollsetSet,
+ TCPShutdown,
+ TCPDestroy,
+ TCPGetResourceUser,
+ TCPGetPeer,
+ TCPGetFD};
+
+grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream,
+ const char* peer_string,
+ grpc_resource_quota* resource_quota,
CFStreamSync* stream_sync) {
CFStreamTCP* tcp = static_cast<CFStreamTCP*>(gpr_malloc(sizeof(CFStreamTCP)));
if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp, read_stream,
- write_stream);
+ gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp,
+ read_stream, write_stream);
}
tcp->base.vtable = &vtable;
gpr_ref_init(&tcp->refcount, 1);
@@ -321,8 +348,8 @@ grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, CFWriteStreamRef wri
GRPC_CLOSURE_INIT(&tcp->write_action, WriteAction, static_cast<void*>(tcp),
grpc_schedule_on_exec_ctx);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
- grpc_resource_user_slice_allocator_init(&tcp->slice_allocator, tcp->resource_user,
- TCPReadAllocationDone, tcp);
+ grpc_resource_user_slice_allocator_init(
+ &tcp->slice_allocator, tcp->resource_user, TCPReadAllocationDone, tcp);
return &tcp->base;
}
diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.cc b/src/core/lib/iomgr/tcp_cfstream_sync.cc
index bc5bd0a600..5571db7491 100644
--- a/src/core/lib/iomgr/tcp_cfstream_sync.cc
+++ b/src/core/lib/iomgr/tcp_cfstream_sync.cc
@@ -17,6 +17,7 @@
*/
#include <grpc/support/port_platform.h>
+
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_CFSTREAM
@@ -52,75 +53,85 @@ void CFStreamSync::ReadCallback(CFReadStreamRef stream, CFStreamEventType type,
void* client_callback_info) {
CFStreamSync* sync = static_cast<CFStreamSync*>(client_callback_info);
CFSTREAM_SYNC_REF(sync, "read callback");
- dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
- grpc_core::ExecCtx exec_ctx;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP ReadCallback (%p, %lu, %p)", stream, type, client_callback_info);
- }
- switch (type) {
- case kCFStreamEventOpenCompleted:
- sync->open_event_.SetReady();
- break;
- case kCFStreamEventHasBytesAvailable:
- case kCFStreamEventEndEncountered:
- sync->read_event_.SetReady();
- break;
- case kCFStreamEventErrorOccurred:
- sync->open_event_.SetReady();
- sync->read_event_.SetReady();
- break;
- default:
- // Impossible
- abort();
- }
- CFSTREAM_SYNC_UNREF(sync, "read callback");
- });
+ dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
+ ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP ReadCallback (%p, %lu, %p)",
+ stream, type, client_callback_info);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ sync->open_event_.SetReady();
+ break;
+ case kCFStreamEventHasBytesAvailable:
+ case kCFStreamEventEndEncountered:
+ sync->read_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ sync->open_event_.SetReady();
+ sync->read_event_.SetReady();
+ break;
+ default:
+ // Impossible
+ abort();
+ }
+ CFSTREAM_SYNC_UNREF(sync, "read callback");
+ });
}
-void CFStreamSync::WriteCallback(CFWriteStreamRef stream, CFStreamEventType type,
+void CFStreamSync::WriteCallback(CFWriteStreamRef stream,
+ CFStreamEventType type,
void* clientCallBackInfo) {
CFStreamSync* sync = static_cast<CFStreamSync*>(clientCallBackInfo);
CFSTREAM_SYNC_REF(sync, "write callback");
- dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
- grpc_core::ExecCtx exec_ctx;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP WriteCallback (%p, %lu, %p)", stream, type, clientCallBackInfo);
- }
- switch (type) {
- case kCFStreamEventOpenCompleted:
- sync->open_event_.SetReady();
- break;
- case kCFStreamEventCanAcceptBytes:
- case kCFStreamEventEndEncountered:
- sync->write_event_.SetReady();
- break;
- case kCFStreamEventErrorOccurred:
- sync->open_event_.SetReady();
- sync->write_event_.SetReady();
- break;
- default:
- // Impossible
- abort();
- }
- CFSTREAM_SYNC_UNREF(sync, "write callback");
- });
+ dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
+ ^{
+ grpc_core::ExecCtx exec_ctx;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP WriteCallback (%p, %lu, %p)",
+ stream, type, clientCallBackInfo);
+ }
+ switch (type) {
+ case kCFStreamEventOpenCompleted:
+ sync->open_event_.SetReady();
+ break;
+ case kCFStreamEventCanAcceptBytes:
+ case kCFStreamEventEndEncountered:
+ sync->write_event_.SetReady();
+ break;
+ case kCFStreamEventErrorOccurred:
+ sync->open_event_.SetReady();
+ sync->write_event_.SetReady();
+ break;
+ default:
+ // Impossible
+ abort();
+ }
+ CFSTREAM_SYNC_UNREF(sync, "write callback");
+ });
}
-CFStreamSync::CFStreamSync(CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
+CFStreamSync::CFStreamSync(CFReadStreamRef read_stream,
+ CFWriteStreamRef write_stream) {
gpr_ref_init(&refcount_, 1);
open_event_.InitEvent();
read_event_.InitEvent();
write_event_.InitEvent();
CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil};
- CFReadStreamSetClient(read_stream,
- kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
- kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
- CFStreamSync::ReadCallback, &ctx);
- CFWriteStreamSetClient(write_stream,
- kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
- kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
- CFStreamSync::WriteCallback, &ctx);
- CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), kCFRunLoopCommonModes);
- CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), kCFRunLoopCommonModes);
+ CFReadStreamSetClient(
+ read_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamSync::ReadCallback, &ctx);
+ CFWriteStreamSetClient(
+ write_stream,
+ kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
+ kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
+ CFStreamSync::WriteCallback, &ctx);
+ CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
+ CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(),
+ kCFRunLoopCommonModes);
}
CFStreamSync::~CFStreamSync() {
@@ -129,11 +140,17 @@ CFStreamSync::~CFStreamSync() {
write_event_.DestroyEvent();
}
-void CFStreamSync::NotifyOnOpen(grpc_closure* closure) { open_event_.NotifyOn(closure); }
+void CFStreamSync::NotifyOnOpen(grpc_closure* closure) {
+ open_event_.NotifyOn(closure);
+}
-void CFStreamSync::NotifyOnRead(grpc_closure* closure) { read_event_.NotifyOn(closure); }
+void CFStreamSync::NotifyOnRead(grpc_closure* closure) {
+ read_event_.NotifyOn(closure);
+}
-void CFStreamSync::NotifyOnWrite(grpc_closure* closure) { write_event_.NotifyOn(closure); }
+void CFStreamSync::NotifyOnWrite(grpc_closure* closure) {
+ write_event_.NotifyOn(closure);
+}
void CFStreamSync::Shutdown(grpc_error* error) {
open_event_.SetShutdown(GRPC_ERROR_REF(error));
@@ -145,8 +162,9 @@ void CFStreamSync::Shutdown(grpc_error* error) {
void CFStreamSync::Ref(const char* file, int line, const char* reason) {
if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP SYNC ref %p : %s %" PRIdPTR " -> %" PRIdPTR,
- this, reason, val, val + 1);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "TCP SYNC ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val,
+ val + 1);
}
gpr_ref(&refcount_);
}
@@ -154,8 +172,8 @@ void CFStreamSync::Ref(const char* file, int line, const char* reason) {
void CFStreamSync::Unref(const char* file, int line, const char* reason) {
if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
- gpr_log(GPR_ERROR, "TCP SYNC unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val,
- val - 1);
+ gpr_log(GPR_ERROR, "TCP SYNC unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
+ reason, val, val - 1);
}
if (gpr_unref(&refcount_)) {
delete this;
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
index 61606ffd11..7deaece904 100644
--- a/src/core/lib/iomgr/tcp_client_cfstream.cc
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -18,6 +18,7 @@
*/
#include <grpc/support/port_platform.h>
+
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_CFSTREAM_TCP_CLIENT
@@ -88,12 +89,13 @@ static void OnAlarm(void* arg, grpc_error* error) {
connect->closure = nil;
const bool done = (--connect->refs == 0);
gpr_mu_unlock(&connect->mu);
- // Only schedule a callback once, by either on_timer or on_connected. The first one issues
- // callback while the second one does cleanup.
+ // Only schedule a callback once, by either on_timer or on_connected. The
+ // first one issues callback while the second one does cleanup.
if (done) {
TCPConnectCleanup(connect);
} else {
- grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out");
+ grpc_error* error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out");
GRPC_CLOSURE_SCHED(closure, error);
}
}
@@ -125,8 +127,9 @@ static void OnOpen(void* arg, grpc_error* error) {
CFRelease(stream_error);
}
if (error == GRPC_ERROR_NONE) {
- *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream, connect->addr_name,
- connect->resource_quota, connect->stream_sync);
+ *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream,
+ connect->addr_name, connect->resource_quota,
+ connect->stream_sync);
}
}
gpr_mu_unlock(&connect->mu);
@@ -134,7 +137,8 @@ static void OnOpen(void* arg, grpc_error* error) {
}
}
-static void ParseResolvedAddress(const grpc_resolved_address* addr, CFStringRef* host, int* port) {
+static void ParseResolvedAddress(const grpc_resolved_address* addr,
+ CFStringRef* host, int* port) {
char *host_port, *host_string, *port_string;
grpc_sockaddr_to_string(&host_port, addr, 1);
gpr_split_host_port(host_port, &host_string, &port_string);
@@ -148,7 +152,8 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr, CFStringRef*
static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep,
grpc_pollset_set* interested_parties,
const grpc_channel_args* channel_args,
- const grpc_resolved_address* resolved_addr, grpc_millis deadline) {
+ const grpc_resolved_address* resolved_addr,
+ grpc_millis deadline) {
CFStreamTCPConnect* connect;
connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect));
@@ -161,7 +166,8 @@ static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep,
gpr_mu_init(&connect->mu);
if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", connect->addr_name);
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
+ connect->addr_name);
}
grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
@@ -182,15 +188,18 @@ static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep,
CFStringRef host;
int port;
ParseResolvedAddress(resolved_addr, &host, &port);
- CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, &write_stream);
+ CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream,
+ &write_stream);
CFRelease(host);
connect->read_stream = read_stream;
connect->write_stream = write_stream;
- connect->stream_sync = CFStreamSync::CreateStreamSync(read_stream, write_stream);
+ connect->stream_sync =
+ CFStreamSync::CreateStreamSync(read_stream, write_stream);
GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
grpc_schedule_on_exec_ctx);
connect->stream_sync->NotifyOnOpen(&connect->on_open);
- GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect,
+ grpc_schedule_on_exec_ctx);
gpr_mu_lock(&connect->mu);
CFReadStreamOpen(read_stream);
CFWriteStreamOpen(write_stream);
diff --git a/tools/run_tests/sanity/core_banned_functions.py b/tools/run_tests/sanity/core_banned_functions.py
index 989990542e..8a4d2f1d1c 100755
--- a/tools/run_tests/sanity/core_banned_functions.py
+++ b/tools/run_tests/sanity/core_banned_functions.py
@@ -29,7 +29,7 @@ BANNED_EXCEPT = {
'grpc_slice_buffer_reset_and_unref(': ['src/core/lib/slice/slice_buffer.c'],
'grpc_slice_ref(': ['src/core/lib/slice/slice.c'],
'grpc_slice_unref(': ['src/core/lib/slice/slice.c'],
- 'grpc_error_create(': ['src/core/lib/iomgr/error.c'],
+ 'grpc_error_create(': ['src/core/lib/iomgr/error.c','src/core/lib/iomgr/error_apple.cc'],
'grpc_error_ref(': ['src/core/lib/iomgr/error.c'],
'grpc_error_unref(': ['src/core/lib/iomgr/error.c'],
'grpc_os_error(': ['src/core/lib/iomgr/error.c'],