diff options
author | Muxi Yan <mxyan@google.com> | 2018-04-25 13:58:10 -0700 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2018-05-15 12:39:06 -0700 |
commit | cdf709c76e1e0bf69426f69854e61eeba42e0ecd (patch) | |
tree | 179915b5b6d064e76b66bb3914395f99f61d5cd2 /src/core/lib/iomgr/tcp_client_cfstream.mm | |
parent | 6abc0aef5d07bb55477c2f59c4793f3452e51efc (diff) |
CFStream tcp client and endpoint
Diffstat (limited to 'src/core/lib/iomgr/tcp_client_cfstream.mm')
-rw-r--r-- | src/core/lib/iomgr/tcp_client_cfstream.mm | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.mm b/src/core/lib/iomgr/tcp_client_cfstream.mm new file mode 100644 index 0000000000..c2c77bd9ca --- /dev/null +++ b/src/core/lib/iomgr/tcp_client_cfstream.mm @@ -0,0 +1,203 @@ + +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_CFSTREAM_TCP_CLIENT + +#include <Foundation/Foundation.h> + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> + +#include <netinet/in.h> + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/gpr/host_port.h" +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/error_apple.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_cfstream.h" +#include "src/core/lib/iomgr/tcp_cfstream_sync.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/timer.h" + +extern grpc_core::TraceFlag grpc_tcp_trace; + +typedef struct CFStreamTCPConnect { + gpr_mu mu; + gpr_refcount refcount; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + CFStreamSync* stream_sync; + + grpc_timer alarm; + grpc_closure on_alarm; + grpc_closure on_open; + + bool read_stream_open; + bool write_stream_open; + bool failed; + + grpc_closure* closure; + grpc_endpoint** endpoint; + int refs; + char* addr_name; + grpc_resource_quota* resource_quota; +} CFStreamTCPConnect; + +static void TCPConnectCleanup(CFStreamTCPConnect* connect) { + grpc_resource_quota_unref_internal(connect->resource_quota); + CFSTREAM_SYNC_UNREF(connect->stream_sync, "async connect clean up"); + CFRelease(connect->read_stream); + CFRelease(connect->write_stream); + gpr_mu_destroy(&connect->mu); + gpr_free(connect->addr_name); + gpr_free(connect); +} + +static void OnAlarm(void* arg, grpc_error* error) { + CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_closure* closure = connect->closure; + connect->closure = nil; + const bool done = (--connect->refs == 0); + gpr_mu_unlock(&connect->mu); + // Only schedule a callback once, by either on_timer or on_connected. The first one issues + // callback while the second one does cleanup. + if (done) { + TCPConnectCleanup(connect); + } else { + grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out"); + GRPC_CLOSURE_SCHED(closure, error); + } +} + +static void OnOpen(void* arg, grpc_error* error) { + CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error); + } + gpr_mu_lock(&connect->mu); + grpc_timer_cancel(&connect->alarm); + grpc_closure* closure = connect->closure; + connect->closure = nil; + + bool done = (--connect->refs == 0); + grpc_endpoint** endpoint = connect->endpoint; + + if (done) { + gpr_mu_unlock(&connect->mu); + TCPConnectCleanup(connect); + } else { + if (error == GRPC_ERROR_NONE) { + CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream); + if (stream_error == NULL) { + stream_error = CFWriteStreamCopyError(connect->write_stream); + } + if (stream_error) { + error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error"); + CFRelease(stream_error); + } + if (error == GRPC_ERROR_NONE) { + *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream, connect->addr_name, + connect->resource_quota, connect->stream_sync); + } + } + gpr_mu_unlock(&connect->mu); + GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(error)); + } +} + +static void ParseResolvedAddress(const grpc_resolved_address* addr, CFStringRef* host, int* port) { + char *host_port, *host_string, *port_string; + grpc_sockaddr_to_string(&host_port, addr, 1); + gpr_split_host_port(host_port, &host_string, &port_string); + *host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8); + gpr_free(host_string); + gpr_free(port_string); + gpr_free(host_port); + *port = grpc_sockaddr_get_port(addr); +} + +static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep, + grpc_pollset_set* interested_parties, + const grpc_channel_args* channel_args, + const grpc_resolved_address* resolved_addr, grpc_millis deadline) { + CFStreamTCPConnect* connect; + + connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect)); + connect->closure = closure; + connect->endpoint = ep; + connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); + // connect->resource_quota = resource_quota; + connect->refs = 2; // One for the connect operation, one for the timer. + gpr_ref_init(&connect->refcount, 1); + gpr_mu_init(&connect->mu); + + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", connect->addr_name); + } + + grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + (grpc_resource_quota*)channel_args->args[i].value.pointer.p); + } + } + } + connect->resource_quota = resource_quota; + + CFReadStreamRef read_stream; + CFWriteStreamRef write_stream; + + CFStringRef host; + int port; + ParseResolvedAddress(resolved_addr, &host, &port); + CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, &write_stream); + CFRelease(host); + connect->read_stream = read_stream; + connect->write_stream = write_stream; + connect->stream_sync = CFStreamSync::CreateStreamSync(read_stream, write_stream); + GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect), + grpc_schedule_on_exec_ctx); + connect->stream_sync->NotifyOnOpen(&connect->on_open); + GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, grpc_schedule_on_exec_ctx); + gpr_mu_lock(&connect->mu); + CFReadStreamOpen(read_stream); + CFWriteStreamOpen(write_stream); + grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm); + gpr_mu_unlock(&connect->mu); +} + +grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {TCPClientConnect}; + +#endif /* GRPC_CFSTREAM_TCP_CLIENT */ |