aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_client_cfstream.cc
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2018-05-18 07:46:22 -0700
committerGravatar Muxi Yan <mxyan@google.com>2018-05-22 08:53:46 -0700
commita70102b1eb52346cfc5b5640df07516c66ef38cb (patch)
tree732b70b1dd9f4d8888a3f3c67d946536143ae60f /src/core/lib/iomgr/tcp_client_cfstream.cc
parentc7d57f1157e7b11c286b635f0a3bc22659cd1900 (diff)
Use cc rather than mm files and merge cfstream related files into core
Diffstat (limited to 'src/core/lib/iomgr/tcp_client_cfstream.cc')
-rw-r--r--src/core/lib/iomgr/tcp_client_cfstream.cc203
1 files changed, 203 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc
new file mode 100644
index 0000000000..61606ffd11
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client_cfstream.cc
@@ -0,0 +1,203 @@
+
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM_TCP_CLIENT
+
+#include <CoreFoundation/CoreFoundation.h>
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+
+#include <netinet/in.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/gpr/host_port.h"
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/error_apple.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_cfstream.h"
+#include "src/core/lib/iomgr/tcp_cfstream_sync.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/timer.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+typedef struct CFStreamTCPConnect {
+ gpr_mu mu;
+ gpr_refcount refcount;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+ CFStreamSync* stream_sync;
+
+ grpc_timer alarm;
+ grpc_closure on_alarm;
+ grpc_closure on_open;
+
+ bool read_stream_open;
+ bool write_stream_open;
+ bool failed;
+
+ grpc_closure* closure;
+ grpc_endpoint** endpoint;
+ int refs;
+ char* addr_name;
+ grpc_resource_quota* resource_quota;
+} CFStreamTCPConnect;
+
+static void TCPConnectCleanup(CFStreamTCPConnect* connect) {
+ grpc_resource_quota_unref_internal(connect->resource_quota);
+ CFSTREAM_SYNC_UNREF(connect->stream_sync, "async connect clean up");
+ CFRelease(connect->read_stream);
+ CFRelease(connect->write_stream);
+ gpr_mu_destroy(&connect->mu);
+ gpr_free(connect->addr_name);
+ gpr_free(connect);
+}
+
+static void OnAlarm(void* arg, grpc_error* error) {
+ CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnAlarm, error:%p", connect, error);
+ }
+ gpr_mu_lock(&connect->mu);
+ grpc_closure* closure = connect->closure;
+ connect->closure = nil;
+ const bool done = (--connect->refs == 0);
+ gpr_mu_unlock(&connect->mu);
+ // Only schedule a callback once, by either on_timer or on_connected. The first one issues
+ // callback while the second one does cleanup.
+ if (done) {
+ TCPConnectCleanup(connect);
+ } else {
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("connect() timed out");
+ GRPC_CLOSURE_SCHED(closure, error);
+ }
+}
+
+static void OnOpen(void* arg, grpc_error* error) {
+ CFStreamTCPConnect* connect = static_cast<CFStreamTCPConnect*>(arg);
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT :%p OnOpen, error:%p", connect, error);
+ }
+ gpr_mu_lock(&connect->mu);
+ grpc_timer_cancel(&connect->alarm);
+ grpc_closure* closure = connect->closure;
+ connect->closure = nil;
+
+ bool done = (--connect->refs == 0);
+ grpc_endpoint** endpoint = connect->endpoint;
+
+ if (done) {
+ gpr_mu_unlock(&connect->mu);
+ TCPConnectCleanup(connect);
+ } else {
+ if (error == GRPC_ERROR_NONE) {
+ CFErrorRef stream_error = CFReadStreamCopyError(connect->read_stream);
+ if (stream_error == NULL) {
+ stream_error = CFWriteStreamCopyError(connect->write_stream);
+ }
+ if (stream_error) {
+ error = GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "connect() error");
+ CFRelease(stream_error);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ *endpoint = grpc_tcp_create(connect->read_stream, connect->write_stream, connect->addr_name,
+ connect->resource_quota, connect->stream_sync);
+ }
+ }
+ gpr_mu_unlock(&connect->mu);
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_REF(error));
+ }
+}
+
+static void ParseResolvedAddress(const grpc_resolved_address* addr, CFStringRef* host, int* port) {
+ char *host_port, *host_string, *port_string;
+ grpc_sockaddr_to_string(&host_port, addr, 1);
+ gpr_split_host_port(host_port, &host_string, &port_string);
+ *host = CFStringCreateWithCString(NULL, host_string, kCFStringEncodingUTF8);
+ gpr_free(host_string);
+ gpr_free(port_string);
+ gpr_free(host_port);
+ *port = grpc_sockaddr_get_port(addr);
+}
+
+static void TCPClientConnect(grpc_closure* closure, grpc_endpoint** ep,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* resolved_addr, grpc_millis deadline) {
+ CFStreamTCPConnect* connect;
+
+ connect = (CFStreamTCPConnect*)gpr_zalloc(sizeof(CFStreamTCPConnect));
+ connect->closure = closure;
+ connect->endpoint = ep;
+ connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
+ // connect->resource_quota = resource_quota;
+ connect->refs = 2; // One for the connect operation, one for the timer.
+ gpr_ref_init(&connect->refcount, 1);
+ gpr_mu_init(&connect->mu);
+
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", connect->addr_name);
+ }
+
+ grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
+ if (channel_args != NULL) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_unref_internal(resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
+ (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+ connect->resource_quota = resource_quota;
+
+ CFReadStreamRef read_stream;
+ CFWriteStreamRef write_stream;
+
+ CFStringRef host;
+ int port;
+ ParseResolvedAddress(resolved_addr, &host, &port);
+ CFStreamCreatePairWithSocketToHost(NULL, host, port, &read_stream, &write_stream);
+ CFRelease(host);
+ connect->read_stream = read_stream;
+ connect->write_stream = write_stream;
+ connect->stream_sync = CFStreamSync::CreateStreamSync(read_stream, write_stream);
+ GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect),
+ grpc_schedule_on_exec_ctx);
+ connect->stream_sync->NotifyOnOpen(&connect->on_open);
+ GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, grpc_schedule_on_exec_ctx);
+ gpr_mu_lock(&connect->mu);
+ CFReadStreamOpen(read_stream);
+ CFWriteStreamOpen(write_stream);
+ grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
+ gpr_mu_unlock(&connect->mu);
+}
+
+grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {TCPClientConnect};
+
+#endif /* GRPC_CFSTREAM_TCP_CLIENT */