From 92ff0490edfcfdb082659fa13a10ce8857ddd48e Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Tue, 22 May 2018 09:13:55 -0700 Subject: sanity --- src/core/lib/iomgr/error_apple.cc | 18 ++-- src/core/lib/iomgr/tcp_cfstream.cc | 107 +++++++++++++-------- src/core/lib/iomgr/tcp_cfstream_sync.cc | 148 +++++++++++++++++------------- src/core/lib/iomgr/tcp_client_cfstream.cc | 31 ++++--- 4 files changed, 180 insertions(+), 124 deletions(-) (limited to 'src/core/lib') diff --git a/src/core/lib/iomgr/error_apple.cc b/src/core/lib/iomgr/error_apple.cc index 42ab6ad4b8..95d69ecee9 100644 --- a/src/core/lib/iomgr/error_apple.cc +++ b/src/core/lib/iomgr/error_apple.cc @@ -28,21 +28,23 @@ #define MAX_ERROR_DESCRIPTION 256 -grpc_error* grpc_error_create_from_cferror(const char* file, int line, void* arg, - const char* custom_desc) { +grpc_error* grpc_error_create_from_cferror(const char* file, int line, + void* arg, const char* custom_desc) { CFErrorRef error = static_cast(arg); char buf_domain[MAX_ERROR_DESCRIPTION], buf_desc[MAX_ERROR_DESCRIPTION]; char* error_msg; CFErrorDomain domain = CFErrorGetDomain((error)); CFIndex code = CFErrorGetCode((error)); CFStringRef desc = CFErrorCopyDescription((error)); - CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION, kCFStringEncodingUTF8); - CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION, kCFStringEncodingUTF8); - gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)", custom_desc, - buf_domain, code, buf_desc); + CFStringGetCString(domain, buf_domain, MAX_ERROR_DESCRIPTION, + kCFStringEncodingUTF8); + CFStringGetCString(desc, buf_desc, MAX_ERROR_DESCRIPTION, + kCFStringEncodingUTF8); + gpr_asprintf(&error_msg, "%s (error domain:%s, code:%ld, description:%s)", + custom_desc, buf_domain, code, buf_desc); CFRelease(desc); - grpc_error* return_error = - grpc_error_create(file, line, grpc_slice_from_copied_string(error_msg), NULL, 0); + grpc_error* return_error = grpc_error_create( + file, line, grpc_slice_from_copied_string(error_msg), NULL, 0); gpr_free(error_msg); return return_error; } diff --git a/src/core/lib/iomgr/tcp_cfstream.cc b/src/core/lib/iomgr/tcp_cfstream.cc index e21c983671..f7ef03d228 100644 --- a/src/core/lib/iomgr/tcp_cfstream.cc +++ b/src/core/lib/iomgr/tcp_cfstream.cc @@ -17,6 +17,7 @@ */ #include + #include "src/core/lib/iomgr/port.h" #ifdef GRPC_CFSTREAM_TCP @@ -72,21 +73,25 @@ static void TCPFree(CFStreamTCP* tcp) { #ifndef NDEBUG #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) #define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file, int line) { +static void tcp_unref(CFStreamTCP* tcp, const char* reason, const char* file, + int line) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, - reason, val, val - 1); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, + val - 1); } if (gpr_unref(&tcp->refcount)) { TCPFree(tcp); } } -static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file, int line) { +static void tcp_ref(CFStreamTCP* tcp, const char* reason, const char* file, + int line) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, - reason, val, val + 1); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, + val + 1); } gpr_ref(&tcp->refcount); } @@ -103,20 +108,23 @@ static void tcp_ref(CFStreamTCP* tcp) { gpr_ref(&tcp->refcount); } static grpc_error* TCPAnnotateError(grpc_error* src_error, CFStreamTCP* tcp) { return grpc_error_set_str( - grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), - GRPC_ERROR_STR_TARGET_ADDRESS, grpc_slice_from_copied_string(tcp->peer_string)); + grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE), + GRPC_ERROR_STR_TARGET_ADDRESS, + grpc_slice_from_copied_string(tcp->peer_string)); } static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) { if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb, tcp->read_cb->cb, - tcp->read_cb->cb_arg); + gpr_log(GPR_DEBUG, "TCP:%p call_read_cb %p %p:%p", tcp, tcp->read_cb, + tcp->read_cb->cb, tcp->read_cb->cb_arg); size_t i; const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "read: error=%s", str); for (i = 0; i < tcp->read_slices->count; i++) { - char* dump = grpc_dump_slice(tcp->read_slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + char* dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); gpr_free(dump); } @@ -129,8 +137,8 @@ static void CallReadCB(CFStreamTCP* tcp, grpc_error* error) { static void CallWriteCB(CFStreamTCP* tcp, grpc_error* error) { if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb, tcp->write_cb->cb, - tcp->write_cb->cb_arg); + gpr_log(GPR_DEBUG, "TCP:%p call_write_cb %p %p:%p", tcp, tcp->write_cb, + tcp->write_cb->cb, tcp->write_cb->cb_arg); const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: error=%s", str); } @@ -153,17 +161,21 @@ static void ReadAction(void* arg, grpc_error* error) { GPR_ASSERT(tcp->read_slices->count == 1); grpc_slice slice = tcp->read_slices->slices[0]; size_t len = GRPC_SLICE_LENGTH(slice); - CFIndex read_size = CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len); + CFIndex read_size = + CFReadStreamRead(tcp->read_stream, GRPC_SLICE_START_PTR(slice), len); if (read_size == -1) { grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); CFErrorRef stream_error = CFReadStreamCopyError(tcp->read_stream); - CallReadCB(tcp, - TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), tcp)); + CallReadCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR( + stream_error, "Read error"), + tcp)); CFRelease(stream_error); TCP_UNREF(tcp, "read"); } else if (read_size == 0) { grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices); - CallReadCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); + CallReadCB(tcp, + TCPAnnotateError( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); TCP_UNREF(tcp, "read"); } else { if (read_size < len) { @@ -186,19 +198,20 @@ static void WriteAction(void* arg, grpc_error* error) { grpc_slice slice = grpc_slice_buffer_take_first(tcp->write_slices); size_t slice_len = GRPC_SLICE_LENGTH(slice); - CFIndex write_size = - CFWriteStreamWrite(tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); + CFIndex write_size = CFWriteStreamWrite( + tcp->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); if (write_size == -1) { grpc_slice_buffer_reset_and_unref_internal(tcp->write_slices); CFErrorRef stream_error = CFWriteStreamCopyError(tcp->write_stream); - CallWriteCB( - tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), tcp)); + CallWriteCB(tcp, TCPAnnotateError(GRPC_ERROR_CREATE_FROM_CFERROR( + stream_error, "write failed."), + tcp)); CFRelease(stream_error); TCP_UNREF(tcp, "write"); } else { if (write_size < GRPC_SLICE_LENGTH(slice)) { - grpc_slice_buffer_undo_take_first(tcp->write_slices, - grpc_slice_sub(slice, write_size, slice_len)); + grpc_slice_buffer_undo_take_first( + tcp->write_slices, grpc_slice_sub(slice, write_size, slice_len)); } if (tcp->write_slices->length > 0) { tcp->stream_sync->NotifyOnWrite(&tcp->write_action); @@ -212,10 +225,10 @@ static void WriteAction(void* arg, grpc_error* error) { char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, dump); gpr_free(dump); - grpc_slice_unref(trace_slice); + grpc_slice_unref_internal(trace_slice); } } - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); } static void TCPReadAllocationDone(void* arg, grpc_error* error) { @@ -229,24 +242,29 @@ static void TCPReadAllocationDone(void* arg, grpc_error* error) { } } -static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb) { +static void TCPRead(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { CFStreamTCP* tcp = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb, slices->length); + gpr_log(GPR_DEBUG, "tcp:%p read (%p, %p) length:%zu", tcp, slices, cb, + slices->length); } GPR_ASSERT(tcp->read_cb == nullptr); tcp->read_cb = cb; tcp->read_slices = slices; grpc_slice_buffer_reset_and_unref_internal(slices); - grpc_resource_user_alloc_slices(&tcp->slice_allocator, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, + grpc_resource_user_alloc_slices(&tcp->slice_allocator, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, tcp->read_slices); TCP_REF(tcp, "read"); } -static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb) { +static void TCPWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, + grpc_closure* cb) { CFStreamTCP* tcp = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb, slices->length); + gpr_log(GPR_DEBUG, "tcp:%p write (%p, %p) length:%zu", tcp, slices, cb, + slices->length); } GPR_ASSERT(tcp->write_cb == nullptr); tcp->write_cb = cb; @@ -290,17 +308,26 @@ void TCPAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} void TCPAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} void TCPDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} -static const grpc_endpoint_vtable vtable = { - TCPRead, TCPWrite, TCPAddToPollset, TCPAddToPollsetSet, TCPDeleteFromPollsetSet, - TCPShutdown, TCPDestroy, TCPGetResourceUser, TCPGetPeer, TCPGetFD}; - -grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, CFWriteStreamRef write_stream, - const char* peer_string, grpc_resource_quota* resource_quota, +static const grpc_endpoint_vtable vtable = {TCPRead, + TCPWrite, + TCPAddToPollset, + TCPAddToPollsetSet, + TCPDeleteFromPollsetSet, + TCPShutdown, + TCPDestroy, + TCPGetResourceUser, + TCPGetPeer, + TCPGetFD}; + +grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream, + const char* peer_string, + grpc_resource_quota* resource_quota, CFStreamSync* stream_sync) { CFStreamTCP* tcp = static_cast(gpr_malloc(sizeof(CFStreamTCP))); if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp, read_stream, - write_stream); + gpr_log(GPR_DEBUG, "tcp:%p create readStream:%p writeStream: %p", tcp, + read_stream, write_stream); } tcp->base.vtable = &vtable; gpr_ref_init(&tcp->refcount, 1); @@ -321,8 +348,8 @@ grpc_endpoint* grpc_tcp_create(CFReadStreamRef read_stream, CFWriteStreamRef wri GRPC_CLOSURE_INIT(&tcp->write_action, WriteAction, static_cast(tcp), grpc_schedule_on_exec_ctx); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); - grpc_resource_user_slice_allocator_init(&tcp->slice_allocator, tcp->resource_user, - TCPReadAllocationDone, tcp); + grpc_resource_user_slice_allocator_init( + &tcp->slice_allocator, tcp->resource_user, TCPReadAllocationDone, tcp); return &tcp->base; } diff --git a/src/core/lib/iomgr/tcp_cfstream_sync.cc b/src/core/lib/iomgr/tcp_cfstream_sync.cc index bc5bd0a600..5571db7491 100644 --- a/src/core/lib/iomgr/tcp_cfstream_sync.cc +++ b/src/core/lib/iomgr/tcp_cfstream_sync.cc @@ -17,6 +17,7 @@ */ #include + #include "src/core/lib/iomgr/port.h" #ifdef GRPC_CFSTREAM @@ -52,75 +53,85 @@ void CFStreamSync::ReadCallback(CFReadStreamRef stream, CFStreamEventType type, void* client_callback_info) { CFStreamSync* sync = static_cast(client_callback_info); CFSTREAM_SYNC_REF(sync, "read callback"); - dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ - grpc_core::ExecCtx exec_ctx; - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "TCP ReadCallback (%p, %lu, %p)", stream, type, client_callback_info); - } - switch (type) { - case kCFStreamEventOpenCompleted: - sync->open_event_.SetReady(); - break; - case kCFStreamEventHasBytesAvailable: - case kCFStreamEventEndEncountered: - sync->read_event_.SetReady(); - break; - case kCFStreamEventErrorOccurred: - sync->open_event_.SetReady(); - sync->read_event_.SetReady(); - break; - default: - // Impossible - abort(); - } - CFSTREAM_SYNC_UNREF(sync, "read callback"); - }); + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), + ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP ReadCallback (%p, %lu, %p)", + stream, type, client_callback_info); + } + switch (type) { + case kCFStreamEventOpenCompleted: + sync->open_event_.SetReady(); + break; + case kCFStreamEventHasBytesAvailable: + case kCFStreamEventEndEncountered: + sync->read_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + sync->open_event_.SetReady(); + sync->read_event_.SetReady(); + break; + default: + // Impossible + abort(); + } + CFSTREAM_SYNC_UNREF(sync, "read callback"); + }); } -void CFStreamSync::WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, +void CFStreamSync::WriteCallback(CFWriteStreamRef stream, + CFStreamEventType type, void* clientCallBackInfo) { CFStreamSync* sync = static_cast(clientCallBackInfo); CFSTREAM_SYNC_REF(sync, "write callback"); - dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{ - grpc_core::ExecCtx exec_ctx; - if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "TCP WriteCallback (%p, %lu, %p)", stream, type, clientCallBackInfo); - } - switch (type) { - case kCFStreamEventOpenCompleted: - sync->open_event_.SetReady(); - break; - case kCFStreamEventCanAcceptBytes: - case kCFStreamEventEndEncountered: - sync->write_event_.SetReady(); - break; - case kCFStreamEventErrorOccurred: - sync->open_event_.SetReady(); - sync->write_event_.SetReady(); - break; - default: - // Impossible - abort(); - } - CFSTREAM_SYNC_UNREF(sync, "write callback"); - }); + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), + ^{ + grpc_core::ExecCtx exec_ctx; + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "TCP WriteCallback (%p, %lu, %p)", + stream, type, clientCallBackInfo); + } + switch (type) { + case kCFStreamEventOpenCompleted: + sync->open_event_.SetReady(); + break; + case kCFStreamEventCanAcceptBytes: + case kCFStreamEventEndEncountered: + sync->write_event_.SetReady(); + break; + case kCFStreamEventErrorOccurred: + sync->open_event_.SetReady(); + sync->write_event_.SetReady(); + break; + default: + // Impossible + abort(); + } + CFSTREAM_SYNC_UNREF(sync, "write callback"); + }); } -CFStreamSync::CFStreamSync(CFReadStreamRef read_stream, CFWriteStreamRef write_stream) { +CFStreamSync::CFStreamSync(CFReadStreamRef read_stream, + CFWriteStreamRef write_stream) { gpr_ref_init(&refcount_, 1); open_event_.InitEvent(); read_event_.InitEvent(); write_event_.InitEvent(); CFStreamClientContext ctx = {0, static_cast(this), nil, nil, nil}; - CFReadStreamSetClient(read_stream, - kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | - kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, - CFStreamSync::ReadCallback, &ctx); - CFWriteStreamSetClient(write_stream, - kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | - kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, - CFStreamSync::WriteCallback, &ctx); - CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), kCFRunLoopCommonModes); - CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), kCFRunLoopCommonModes); + CFReadStreamSetClient( + read_stream, + kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamSync::ReadCallback, &ctx); + CFWriteStreamSetClient( + write_stream, + kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | + kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, + CFStreamSync::WriteCallback, &ctx); + CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); + CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(), + kCFRunLoopCommonModes); } CFStreamSync::~CFStreamSync() { @@ -129,11 +140,17 @@ CFStreamSync::~CFStreamSync() { write_event_.DestroyEvent(); } -void CFStreamSync::NotifyOnOpen(grpc_closure* closure) { open_event_.NotifyOn(closure); } +void CFStreamSync::NotifyOnOpen(grpc_closure* closure) { + open_event_.NotifyOn(closure); +} -void CFStreamSync::NotifyOnRead(grpc_closure* closure) { read_event_.NotifyOn(closure); } +void CFStreamSync::NotifyOnRead(grpc_closure* closure) { + read_event_.NotifyOn(closure); +} -void CFStreamSync::NotifyOnWrite(grpc_closure* closure) { write_event_.NotifyOn(closure); } +void CFStreamSync::NotifyOnWrite(grpc_closure* closure) { + write_event_.NotifyOn(closure); +} void CFStreamSync::Shutdown(grpc_error* error) { open_event_.SetShutdown(GRPC_ERROR_REF(error)); @@ -145,8 +162,9 @@ void CFStreamSync::Shutdown(grpc_error* error) { void CFStreamSync::Ref(const char* file, int line, const char* reason) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); - gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP SYNC ref %p : %s %" PRIdPTR " -> %" PRIdPTR, - this, reason, val, val + 1); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP SYNC ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val, + val + 1); } gpr_ref(&refcount_); } @@ -154,8 +172,8 @@ void CFStreamSync::Ref(const char* file, int line, const char* reason) { void CFStreamSync::Unref(const char* file, int line, const char* reason) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count); - gpr_log(GPR_ERROR, "TCP SYNC unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, reason, val, - val - 1); + gpr_log(GPR_ERROR, "TCP SYNC unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this, + reason, val, val - 1); } if (gpr_unref(&refcount_)) { delete this; diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc index 61606ffd11..7deaece904 100644 --- a/src/core/lib/iomgr/tcp_client_cfstream.cc +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -18,6 +18,7 @@ */ #include + #include "src/core/lib/iomgr/port.h" #ifdef GRPC_CFSTREAM_TCP_CLIENT @@ -88,12 +89,13 @@ static void OnAlarm(void* arg, grpc_error* error) { connect->closure = nil; const bool done = (--connect->refs == 0); gpr_mu_unlock(&connect->mu); - // Only schedule a callback once, by either on_timer or on_connected. The first one issues - // callback while the second one does cleanup. + // Only schedule a callback once, by either on_timer or on_connected. The + // first one issues callback while the second one does cleanup. if (done) { TCPConnectCleanup(connect); } else { - grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); + grpc_error* error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); GRPC_CLOSURE_SCHED(closure, error); } } @@ -125,8 +127,9 @@ static void OnOpen(void* arg, grpc_error* error) { CFRelease(stream_error); } if (error == GRPC_ERROR_NONE) { - *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream, connect->addr_name, - connect->resource_quota, connect->stream_sync); + *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream, + connect->addr_name, connect->resource_quota, + connect->stream_sync); } } gpr_mu_unlock(&connect->mu); @@ -134,7 +137,8 @@ static void OnOpen(void* arg, grpc_error* error) { } } -static void ParseResolvedAddress(const grpc_resolved_address* addr, CFStringRef* host, int* port) { +static void ParseResolvedAddress(const grpc_resolved_address* addr, + CFStringRef* host, int* port) { char *host_port, *host_string, *port_string; grpc_sockaddr_to_string(&host_port, addr, 1); gpr_split_host_port(host_port, &host_string, &port_string); @@ -148,7 +152,8 @@ static void ParseResolvedAddress(const grpc_resolved_address* addr, CFStringRef* static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args, - const grpc_resolved_address* resolved_addr, grpc_millis deadline) { + const grpc_resolved_address* resolved_addr, + grpc_millis deadline) { CFStreamTCPConnect* connect; connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect)); @@ -161,7 +166,8 @@ static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, gpr_mu_init(&connect->mu); if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", connect->addr_name); + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", + connect->addr_name); } grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); @@ -182,15 +188,18 @@ static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, CFStringRef host; int port; ParseResolvedAddress(resolved_addr, &host, &port); - CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, &write_stream); + CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, + &write_stream); CFRelease(host); connect->read_stream = read_stream; connect->write_stream = write_stream; - connect->stream_sync = CFStreamSync::CreateStreamSync(read_stream, write_stream); + connect->stream_sync = + CFStreamSync::CreateStreamSync(read_stream, write_stream); GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast(connect), grpc_schedule_on_exec_ctx); connect->stream_sync->NotifyOnOpen(&connect->on_open); - GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, + grpc_schedule_on_exec_ctx); gpr_mu_lock(&connect->mu); CFReadStreamOpen(read_stream); CFWriteStreamOpen(write_stream); -- cgit v1.2.3