aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar kpayson64 <kpayson@google.com>2018-03-12 19:16:30 -0700
committerGravatar kpayson64 <kpayson@google.com>2018-03-13 01:21:42 -0700
commit539f5068bd14e3d07b58309b657222919e94aba5 (patch)
treebc5980993f806841a79c973d302524ab8adba576 /src
parent44fd6557aefad4689eac7225386aecefd1f9a5bc (diff)
Abstract libuv implementation
Structures the libuv implementation to allow for a plugable BSD style socket implementation to allow for other IO Managers
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc15
-rw-r--r--src/core/ext/filters/client_channel/parse_address.cc26
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc24
-rw-r--r--src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc1
-rw-r--r--src/core/lib/iomgr/endpoint.cc2
-rw-r--r--src/core/lib/iomgr/endpoint_pair_windows.cc9
-rw-r--r--src/core/lib/iomgr/ev_posix.cc52
-rw-r--r--src/core/lib/iomgr/exec_ctx.h7
-rw-r--r--src/core/lib/iomgr/iomgr.cc1
-rw-r--r--src/core/lib/iomgr/iomgr_custom.cc63
-rw-r--r--src/core/lib/iomgr/iomgr_custom.h (renamed from src/core/lib/iomgr/iomgr_uv.h)24
-rw-r--r--src/core/lib/iomgr/iomgr_internal.cc43
-rw-r--r--src/core/lib/iomgr/iomgr_internal.h12
-rw-r--r--src/core/lib/iomgr/iomgr_posix.cc31
-rw-r--r--src/core/lib/iomgr/iomgr_uv.cc35
-rw-r--r--src/core/lib/iomgr/iomgr_windows.cc30
-rw-r--r--src/core/lib/iomgr/pollset.cc56
-rw-r--r--src/core/lib/iomgr/pollset.h18
-rw-r--r--src/core/lib/iomgr/pollset_custom.cc106
-rw-r--r--src/core/lib/iomgr/pollset_custom.h35
-rw-r--r--src/core/lib/iomgr/pollset_set.cc (renamed from src/core/lib/iomgr/pollset_set_uv.cc)36
-rw-r--r--src/core/lib/iomgr/pollset_set.h11
-rw-r--r--src/core/lib/iomgr/pollset_set_custom.cc48
-rw-r--r--src/core/lib/iomgr/pollset_set_custom.h (renamed from src/core/lib/iomgr/timer_uv.h)18
-rw-r--r--src/core/lib/iomgr/pollset_set_windows.cc25
-rw-r--r--src/core/lib/iomgr/pollset_uv.cc145
-rw-r--r--src/core/lib/iomgr/pollset_uv.h9
-rw-r--r--src/core/lib/iomgr/pollset_windows.cc28
-rw-r--r--src/core/lib/iomgr/port.h27
-rw-r--r--src/core/lib/iomgr/resolve_address.cc50
-rw-r--r--src/core/lib/iomgr/resolve_address.h29
-rw-r--r--src/core/lib/iomgr/resolve_address_custom.cc187
-rw-r--r--src/core/lib/iomgr/resolve_address_custom.h43
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.cc28
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.cc286
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.cc28
-rw-r--r--src/core/lib/iomgr/resource_quota.h4
-rw-r--r--src/core/lib/iomgr/sockaddr.h14
-rw-r--r--src/core/lib/iomgr/sockaddr_custom.h54
-rw-r--r--src/core/lib/iomgr/sockaddr_posix.h24
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.cc156
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.h2
-rw-r--r--src/core/lib/iomgr/sockaddr_windows.h19
-rw-r--r--src/core/lib/iomgr/socket_utils.h9
-rw-r--r--src/core/lib/iomgr/socket_utils_common_posix.cc18
-rw-r--r--src/core/lib/iomgr/socket_utils_linux.cc3
-rw-r--r--src/core/lib/iomgr/socket_utils_posix.cc2
-rw-r--r--src/core/lib/iomgr/socket_utils_uv.cc17
-rw-r--r--src/core/lib/iomgr/socket_utils_windows.cc8
-rw-r--r--src/core/lib/iomgr/tcp_client.cc36
-rw-r--r--src/core/lib/iomgr/tcp_client.h11
-rw-r--r--src/core/lib/iomgr/tcp_client_custom.cc151
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc30
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.cc177
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.cc31
-rw-r--r--src/core/lib/iomgr/tcp_custom.cc365
-rw-r--r--src/core/lib/iomgr/tcp_custom.h83
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc2
-rw-r--r--src/core/lib/iomgr/tcp_server.cc73
-rw-r--r--src/core/lib/iomgr/tcp_server.h22
-rw-r--r--src/core/lib/iomgr/tcp_server_custom.cc479
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc49
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_common.cc5
-rw-r--r--src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc8
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.cc473
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.cc62
-rw-r--r--src/core/lib/iomgr/tcp_uv.cc627
-rw-r--r--src/core/lib/iomgr/tcp_uv.h53
-rw-r--r--src/core/lib/iomgr/tcp_windows.cc2
-rw-r--r--src/core/lib/iomgr/timer.cc45
-rw-r--r--src/core/lib/iomgr/timer.h48
-rw-r--r--src/core/lib/iomgr/timer_custom.cc93
-rw-r--r--src/core/lib/iomgr/timer_custom.h43
-rw-r--r--src/core/lib/iomgr/timer_generic.cc20
-rw-r--r--src/core/lib/iomgr/timer_heap.cc4
-rw-r--r--src/core/lib/iomgr/timer_uv.cc62
-rw-r--r--src/core/lib/iomgr/udp_server.cc15
-rw-r--r--src/core/lib/iomgr/unix_sockets_posix.cc12
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py19
79 files changed, 3123 insertions, 1895 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index cb39e4224e..47e1deef12 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -61,6 +61,7 @@
#include <grpc/support/port_platform.h>
#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include <inttypes.h>
#include <limits.h>
@@ -417,20 +418,20 @@ void ParseServer(const grpc_grpclb_server* server,
grpc_resolved_address* addr) {
memset(addr, 0, sizeof(*addr));
if (server->drop) return;
- const uint16_t netorder_port = htons((uint16_t)server->port);
+ const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
/* the addresses are given in binary format (a in(6)_addr struct) in
* server->ip_address.bytes. */
const grpc_grpclb_ip_address* ip = &server->ip_address;
if (ip->size == 4) {
- addr->len = sizeof(struct sockaddr_in);
- struct sockaddr_in* addr4 = (struct sockaddr_in*)&addr->addr;
- addr4->sin_family = AF_INET;
+ addr->len = sizeof(grpc_sockaddr_in);
+ grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
+ addr4->sin_family = GRPC_AF_INET;
memcpy(&addr4->sin_addr, ip->bytes, ip->size);
addr4->sin_port = netorder_port;
} else if (ip->size == 16) {
- addr->len = sizeof(struct sockaddr_in6);
- struct sockaddr_in6* addr6 = (struct sockaddr_in6*)&addr->addr;
- addr6->sin6_family = AF_INET6;
+ addr->len = sizeof(grpc_sockaddr_in6);
+ grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
+ addr6->sin6_family = GRPC_AF_INET6;
memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
addr6->sin6_port = netorder_port;
}
diff --git a/src/core/ext/filters/client_channel/parse_address.cc b/src/core/ext/filters/client_channel/parse_address.cc
index e78dc99e0b..92ea259cf0 100644
--- a/src/core/ext/filters/client_channel/parse_address.cc
+++ b/src/core/ext/filters/client_channel/parse_address.cc
@@ -20,6 +20,7 @@
#include "src/core/ext/filters/client_channel/parse_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/socket_utils.h"
#include <stdio.h>
#include <string.h>
@@ -71,10 +72,10 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr,
if (!gpr_split_host_port(hostport, &host, &port)) return false;
// Parse IP address.
memset(addr, 0, sizeof(*addr));
- addr->len = sizeof(struct sockaddr_in);
- struct sockaddr_in* in = reinterpret_cast<struct sockaddr_in*>(addr->addr);
- in->sin_family = AF_INET;
- if (inet_pton(AF_INET, host, &in->sin_addr) == 0) {
+ addr->len = sizeof(grpc_sockaddr_in);
+ grpc_sockaddr_in* in = reinterpret_cast<grpc_sockaddr_in*>(addr->addr);
+ in->sin_family = GRPC_AF_INET;
+ if (grpc_inet_pton(GRPC_AF_INET, host, &in->sin_addr) == 0) {
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host);
goto done;
}
@@ -88,7 +89,7 @@ bool grpc_parse_ipv4_hostport(const char* hostport, grpc_resolved_address* addr,
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port);
goto done;
}
- in->sin_port = htons(static_cast<uint16_t>(port_num));
+ in->sin_port = grpc_htons(static_cast<uint16_t>(port_num));
success = true;
done:
gpr_free(host);
@@ -117,19 +118,20 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
if (!gpr_split_host_port(hostport, &host, &port)) return false;
// Parse IP address.
memset(addr, 0, sizeof(*addr));
- addr->len = sizeof(struct sockaddr_in6);
- struct sockaddr_in6* in6 = reinterpret_cast<struct sockaddr_in6*>(addr->addr);
- in6->sin6_family = AF_INET6;
+ addr->len = sizeof(grpc_sockaddr_in6);
+ grpc_sockaddr_in6* in6 = reinterpret_cast<grpc_sockaddr_in6*>(addr->addr);
+ in6->sin6_family = GRPC_AF_INET6;
// Handle the RFC6874 syntax for IPv6 zone identifiers.
char* host_end = static_cast<char*>(gpr_memrchr(host, '%', strlen(host)));
if (host_end != nullptr) {
GPR_ASSERT(host_end >= host);
- char host_without_scope[INET6_ADDRSTRLEN];
+ char host_without_scope[GRPC_INET6_ADDRSTRLEN];
size_t host_without_scope_len = static_cast<size_t>(host_end - host);
uint32_t sin6_scope_id = 0;
strncpy(host_without_scope, host, host_without_scope_len);
host_without_scope[host_without_scope_len] = '\0';
- if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) {
+ if (grpc_inet_pton(GRPC_AF_INET6, host_without_scope, &in6->sin6_addr) ==
+ 0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope);
goto done;
}
@@ -142,7 +144,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
// Handle "sin6_scope_id" being type "u_long". See grpc issue #10027.
in6->sin6_scope_id = sin6_scope_id;
} else {
- if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
+ if (grpc_inet_pton(GRPC_AF_INET6, host, &in6->sin6_addr) == 0) {
gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
goto done;
}
@@ -157,7 +159,7 @@ bool grpc_parse_ipv6_hostport(const char* hostport, grpc_resolved_address* addr,
if (log_errors) gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port);
goto done;
}
- in6->sin6_port = htons(static_cast<uint16_t>(port_num));
+ in6->sin6_port = grpc_htons(static_cast<uint16_t>(port_num));
success = true;
done:
gpr_free(host);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index aa93e5d8de..a895afa784 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -440,6 +440,27 @@ class AresDnsResolverFactory : public ResolverFactory {
} // namespace grpc_core
+extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
+static grpc_address_resolver_vtable* default_resolver;
+
+static void resolve_address_ares(const char* addr, const char* default_port,
+ grpc_pollset_set* interested_parties,
+ grpc_closure* on_done,
+ grpc_resolved_addresses** addrs) {
+ grpc_resolve_address_ares(addr, default_port, interested_parties, on_done,
+ addrs);
+}
+
+static grpc_error* blocking_resolve_address_ares(
+ const char* name, const char* default_port,
+ grpc_resolved_addresses** addresses) {
+ return default_resolver->blocking_resolve_address(name, default_port,
+ addresses);
+}
+
+static grpc_address_resolver_vtable ares_resolver = {
+ resolve_address_ares, blocking_resolve_address_ares};
+
void grpc_resolver_dns_ares_init() {
char* resolver_env = gpr_getenv("GRPC_DNS_RESOLVER");
/* TODO(zyc): Turn on c-ares based resolver by default after the address
@@ -450,7 +471,8 @@ void grpc_resolver_dns_ares_init() {
GRPC_LOG_IF_ERROR("ares_library_init() failed", error);
return;
}
- grpc_resolve_address = grpc_resolve_address_ares;
+ default_resolver = grpc_resolve_address_impl;
+ grpc_set_resolver_impl(&ares_resolver);
grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
grpc_core::UniquePtr<grpc_core::ResolverFactory>(
grpc_core::New<grpc_core::AresDnsResolverFactory>()));
diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
index 822236dd2d..99f18cdf39 100644
--- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
+++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.cc
@@ -41,6 +41,5 @@ int grpc_server_add_insecure_http2_port(grpc_server* server, const char* addr) {
GRPC_ERROR_UNREF(err);
}
-
return port_num;
}
diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc
index e22c21e4bd..92e7930111 100644
--- a/src/core/lib/iomgr/endpoint.cc
+++ b/src/core/lib/iomgr/endpoint.cc
@@ -20,6 +20,8 @@
#include "src/core/lib/iomgr/endpoint.h"
+grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
+
void grpc_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
grpc_closure* cb) {
ep->vtable->read(ep, slices, cb);
diff --git a/src/core/lib/iomgr/endpoint_pair_windows.cc b/src/core/lib/iomgr/endpoint_pair_windows.cc
index 416c9d88a1..177331d681 100644
--- a/src/core/lib/iomgr/endpoint_pair_windows.cc
+++ b/src/core/lib/iomgr/endpoint_pair_windows.cc
@@ -22,6 +22,7 @@
#ifdef GRPC_WINSOCK_SOCKET
#include "src/core/lib/iomgr/endpoint_pair.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include <errno.h>
@@ -46,19 +47,19 @@ static void create_sockets(SOCKET sv[2]) {
memset(&addr, 0, sizeof(addr));
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_family = AF_INET;
- GPR_ASSERT(bind(lst_sock, (struct sockaddr*)&addr, sizeof(addr)) !=
+ GPR_ASSERT(bind(lst_sock, (grpc_sockaddr*)&addr, sizeof(addr)) !=
SOCKET_ERROR);
GPR_ASSERT(listen(lst_sock, SOMAXCONN) != SOCKET_ERROR);
- GPR_ASSERT(getsockname(lst_sock, (struct sockaddr*)&addr, &addr_len) !=
+ GPR_ASSERT(getsockname(lst_sock, (grpc_sockaddr*)&addr, &addr_len) !=
SOCKET_ERROR);
cli_sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
WSA_FLAG_OVERLAPPED);
GPR_ASSERT(cli_sock != INVALID_SOCKET);
- GPR_ASSERT(WSAConnect(cli_sock, (struct sockaddr*)&addr, addr_len, NULL, NULL,
+ GPR_ASSERT(WSAConnect(cli_sock, (grpc_sockaddr*)&addr, addr_len, NULL, NULL,
NULL, NULL) == 0);
- svr_sock = accept(lst_sock, (struct sockaddr*)&addr, &addr_len);
+ svr_sock = accept(lst_sock, (grpc_sockaddr*)&addr, &addr_len);
GPR_ASSERT(svr_sock != INVALID_SOCKET);
closesocket(lst_sock);
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 39ce459f1e..8b80070265 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -224,26 +224,26 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
g_event_engine->fd_notify_on_write(fd, closure);
}
-size_t grpc_pollset_size(void) { return g_event_engine->pollset_size; }
+static size_t pollset_size(void) { return g_event_engine->pollset_size; }
-void grpc_pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
+static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
GRPC_POLLING_API_TRACE("pollset_init(%p)", pollset);
g_event_engine->pollset_init(pollset, mu);
}
-void grpc_pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
+static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
GRPC_POLLING_API_TRACE("pollset_shutdown(%p)", pollset);
g_event_engine->pollset_shutdown(pollset, closure);
}
-void grpc_pollset_destroy(grpc_pollset* pollset) {
+static void pollset_destroy(grpc_pollset* pollset) {
GRPC_POLLING_API_TRACE("pollset_destroy(%p)", pollset);
g_event_engine->pollset_destroy(pollset);
}
-grpc_error* grpc_pollset_work(grpc_pollset* pollset,
- grpc_pollset_worker** worker,
- grpc_millis deadline) {
+static grpc_error* pollset_work(grpc_pollset* pollset,
+ grpc_pollset_worker** worker,
+ grpc_millis deadline) {
GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRIdPTR ") begin", pollset,
deadline);
grpc_error* err = g_event_engine->pollset_work(pollset, worker, deadline);
@@ -252,8 +252,8 @@ grpc_error* grpc_pollset_work(grpc_pollset* pollset,
return err;
}
-grpc_error* grpc_pollset_kick(grpc_pollset* pollset,
- grpc_pollset_worker* specific_worker) {
+static grpc_error* pollset_kick(grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker) {
GRPC_POLLING_API_TRACE("pollset_kick(%p, %p)", pollset, specific_worker);
return g_event_engine->pollset_kick(pollset, specific_worker);
}
@@ -264,43 +264,57 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd) {
g_event_engine->pollset_add_fd(pollset, fd);
}
-grpc_pollset_set* grpc_pollset_set_create(void) {
+void pollset_global_init() {}
+void pollset_global_shutdown() {}
+
+grpc_pollset_vtable grpc_posix_pollset_vtable = {
+ pollset_global_init, pollset_global_shutdown,
+ pollset_init, pollset_shutdown,
+ pollset_destroy, pollset_work,
+ pollset_kick, pollset_size};
+
+static grpc_pollset_set* pollset_set_create(void) {
grpc_pollset_set* pss = g_event_engine->pollset_set_create();
GRPC_POLLING_API_TRACE("pollset_set_create(%p)", pss);
return pss;
}
-void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {
+static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
GRPC_POLLING_API_TRACE("pollset_set_destroy(%p)", pollset_set);
g_event_engine->pollset_set_destroy(pollset_set);
}
-void grpc_pollset_set_add_pollset(grpc_pollset_set* pollset_set,
- grpc_pollset* pollset) {
+static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {
GRPC_POLLING_API_TRACE("pollset_set_add_pollset(%p, %p)", pollset_set,
pollset);
g_event_engine->pollset_set_add_pollset(pollset_set, pollset);
}
-void grpc_pollset_set_del_pollset(grpc_pollset_set* pollset_set,
- grpc_pollset* pollset) {
+static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {
GRPC_POLLING_API_TRACE("pollset_set_del_pollset(%p, %p)", pollset_set,
pollset);
g_event_engine->pollset_set_del_pollset(pollset_set, pollset);
}
-void grpc_pollset_set_add_pollset_set(grpc_pollset_set* bag,
- grpc_pollset_set* item) {
+static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
+ grpc_pollset_set* item) {
GRPC_POLLING_API_TRACE("pollset_set_add_pollset_set(%p, %p)", bag, item);
g_event_engine->pollset_set_add_pollset_set(bag, item);
}
-void grpc_pollset_set_del_pollset_set(grpc_pollset_set* bag,
- grpc_pollset_set* item) {
+static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
+ grpc_pollset_set* item) {
GRPC_POLLING_API_TRACE("pollset_set_del_pollset_set(%p, %p)", bag, item);
g_event_engine->pollset_set_del_pollset_set(bag, item);
}
+grpc_pollset_set_vtable grpc_posix_pollset_set_vtable = {
+ pollset_set_create, pollset_set_destroy,
+ pollset_set_add_pollset, pollset_set_del_pollset,
+ pollset_set_add_pollset_set, pollset_set_del_pollset_set};
+
void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
GRPC_POLLING_API_TRACE("pollset_set_add_fd(%p, %d)", pollset_set,
grpc_fd_wrapped_fd(fd));
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index de97164f02..e4edcf67f4 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -171,6 +171,10 @@ on outside context */
return reinterpret_cast<ExecCtx*>(gpr_tls_get(&exec_ctx_));
}
+ static void Set(ExecCtx* exec_ctx) {
+ gpr_tls_set(&exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx));
+ }
+
protected:
/** Check if ready to finish */
virtual bool CheckReadyToFinish() { return false; }
@@ -180,9 +184,6 @@ on outside context */
private:
/** Set exec_ctx_ to exec_ctx */
- void Set(ExecCtx* exec_ctx) {
- gpr_tls_set(&exec_ctx_, reinterpret_cast<intptr_t>(exec_ctx));
- }
grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT;
CombinerData combiner_data_ = {nullptr, nullptr};
diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc
index 3c2b83a549..468814eaee 100644
--- a/src/core/lib/iomgr/iomgr.cc
+++ b/src/core/lib/iomgr/iomgr.cc
@@ -47,6 +47,7 @@ static grpc_iomgr_object g_root_object;
void grpc_iomgr_init() {
grpc_core::ExecCtx exec_ctx;
+ grpc_determine_iomgr_platform();
g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc
new file mode 100644
index 0000000000..d34c8e7cd1
--- /dev/null
+++ b/src/core/lib/iomgr/iomgr_custom.cc
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <grpc/support/thd_id.h>
+
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/executor.h"
+#include "src/core/lib/iomgr/iomgr_custom.h"
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/pollset_custom.h"
+#include "src/core/lib/iomgr/pollset_set_custom.h"
+#include "src/core/lib/iomgr/resolve_address_custom.h"
+
+gpr_thd_id g_init_thread;
+
+static void iomgr_platform_init(void) {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_executor_set_threading(false);
+ g_init_thread = gpr_thd_currentid();
+ grpc_pollset_global_init();
+}
+static void iomgr_platform_flush(void) {}
+static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
+
+static grpc_iomgr_platform_vtable vtable = {
+ iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown};
+
+void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
+ grpc_custom_resolver_vtable* resolver,
+ grpc_custom_timer_vtable* timer,
+ grpc_custom_poller_vtable* poller) {
+ grpc_custom_endpoint_init(socket);
+ grpc_custom_timer_init(timer);
+ grpc_custom_pollset_init(poller);
+ grpc_custom_pollset_set_init();
+ grpc_custom_resolver_init(resolver);
+ grpc_set_iomgr_platform_vtable(&vtable);
+}
+
+#ifdef GRPC_CUSTOM_SOCKET
+grpc_iomgr_platform_vtable* grpc_default_iomgr_platform_vtable() {
+ return &vtable;
+}
+#endif
diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_custom.h
index 4d62f00ad6..ceb6c65db2 100644
--- a/src/core/lib/iomgr/iomgr_uv.h
+++ b/src/core/lib/iomgr/iomgr_custom.h
@@ -16,24 +16,32 @@
*
*/
-#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_UV_H
-#define GRPC_CORE_LIB_IOMGR_IOMGR_UV_H
+#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_CUSTOM_H
+#define GRPC_CORE_LIB_IOMGR_IOMGR_CUSTOM_H
#include <grpc/support/port_platform.h>
-#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/pollset_custom.h"
+#include "src/core/lib/iomgr/resolve_address_custom.h"
+#include "src/core/lib/iomgr/tcp_custom.h"
+#include "src/core/lib/iomgr/timer_custom.h"
#include <grpc/support/thd_id.h>
/* The thread ID of the thread on which grpc was initialized. Used to verify
- * that all calls into libuv are made on that same thread */
+ * that all calls into the custom iomgr are made on that same thread */
extern gpr_thd_id g_init_thread;
-#ifdef GRPC_UV_THREAD_CHECK
-#define GRPC_UV_ASSERT_SAME_THREAD() \
+#ifdef GRPC_CUSTOM_IOMGR_THREAD_CHECK
+#define GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD() \
GPR_ASSERT(gpr_thd_currentid() == g_init_thread)
#else
-#define GRPC_UV_ASSERT_SAME_THREAD()
+#define GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD()
#endif /* GRPC_UV_THREAD_CHECK */
-#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_UV_H */
+void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
+ grpc_custom_resolver_vtable* resolver,
+ grpc_custom_timer_vtable* timer,
+ grpc_custom_poller_vtable* poller);
+
+#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_CUSTOM_H */
diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc
new file mode 100644
index 0000000000..32dbabb79d
--- /dev/null
+++ b/src/core/lib/iomgr/iomgr_internal.cc
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <stddef.h>
+
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/iomgr/timer_manager.h"
+
+static grpc_iomgr_platform_vtable* iomgr_platform_vtable = nullptr;
+
+void grpc_set_iomgr_platform_vtable(grpc_iomgr_platform_vtable* vtable) {
+ iomgr_platform_vtable = vtable;
+}
+
+void grpc_determine_iomgr_platform() {
+ if (iomgr_platform_vtable == nullptr) {
+ grpc_set_default_iomgr_platform();
+ }
+}
+
+void grpc_iomgr_platform_init() { iomgr_platform_vtable->init(); }
+
+void grpc_iomgr_platform_flush() { iomgr_platform_vtable->flush(); }
+
+void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); }
diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h
index 644219fb4d..b011d9c7b1 100644
--- a/src/core/lib/iomgr/iomgr_internal.h
+++ b/src/core/lib/iomgr/iomgr_internal.h
@@ -31,9 +31,21 @@ typedef struct grpc_iomgr_object {
struct grpc_iomgr_object* prev;
} grpc_iomgr_object;
+typedef struct grpc_iomgr_platform_vtable {
+ void (*init)(void);
+ void (*flush)(void);
+ void (*shutdown)(void);
+} grpc_iomgr_platform_vtable;
+
void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name);
void grpc_iomgr_unregister_object(grpc_iomgr_object* obj);
+void grpc_determine_iomgr_platform();
+
+void grpc_set_iomgr_platform_vtable(grpc_iomgr_platform_vtable* vtable);
+
+void grpc_set_default_iomgr_platform();
+
void grpc_iomgr_platform_init(void);
/** flush any globally queued work from iomgr */
void grpc_iomgr_platform_flush(void);
diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc
index 35b8adf01e..66c9cb7ff7 100644
--- a/src/core/lib/iomgr/iomgr_posix.cc
+++ b/src/core/lib/iomgr/iomgr_posix.cc
@@ -24,19 +24,44 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/iomgr_posix.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/tcp_posix.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/iomgr/timer.h"
-void grpc_iomgr_platform_init(void) {
+extern grpc_tcp_server_vtable grpc_posix_tcp_server_vtable;
+extern grpc_tcp_client_vtable grpc_posix_tcp_client_vtable;
+extern grpc_timer_vtable grpc_generic_timer_vtable;
+extern grpc_pollset_vtable grpc_posix_pollset_vtable;
+extern grpc_pollset_set_vtable grpc_posix_pollset_set_vtable;
+extern grpc_address_resolver_vtable grpc_posix_resolver_vtable;
+
+static void iomgr_platform_init(void) {
grpc_wakeup_fd_global_init();
grpc_event_engine_init();
}
-void grpc_iomgr_platform_flush(void) {}
+static void iomgr_platform_flush(void) {}
-void grpc_iomgr_platform_shutdown(void) {
+static void iomgr_platform_shutdown(void) {
grpc_event_engine_shutdown();
grpc_wakeup_fd_global_destroy();
}
+static grpc_iomgr_platform_vtable vtable = {
+ iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown};
+
+void grpc_set_default_iomgr_platform() {
+ grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable);
+ grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable);
+ grpc_set_timer_impl(&grpc_generic_timer_vtable);
+ grpc_set_pollset_vtable(&grpc_posix_pollset_vtable);
+ grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable);
+ grpc_set_resolver_impl(&grpc_posix_resolver_vtable);
+ grpc_set_iomgr_platform_vtable(&vtable);
+}
+
#endif /* GRPC_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/iomgr_uv.cc b/src/core/lib/iomgr/iomgr_uv.cc
index c11c37ca20..4a984446db 100644
--- a/src/core/lib/iomgr/iomgr_uv.cc
+++ b/src/core/lib/iomgr/iomgr_uv.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2016 gRPC authors.
+ * Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,26 +20,21 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_UV
+#if defined(GRPC_CUSTOM_SOCKET) && defined(GRPC_UV)
-#include <grpc/support/thd_id.h>
+#include "src/core/lib/iomgr/iomgr_custom.h"
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/pollset_custom.h"
+#include "src/core/lib/iomgr/tcp_custom.h"
+#include "src/core/lib/iomgr/timer_custom.h"
-#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/executor.h"
-#include "src/core/lib/iomgr/iomgr_uv.h"
-#include "src/core/lib/iomgr/pollset_uv.h"
-#include "src/core/lib/iomgr/tcp_uv.h"
+extern grpc_socket_vtable grpc_uv_socket_vtable;
+extern grpc_custom_resolver_vtable uv_resolver_vtable;
+extern grpc_custom_timer_vtable uv_timer_vtable;
+extern grpc_custom_poller_vtable uv_pollset_vtable;
-gpr_thd_id g_init_thread;
-
-void grpc_iomgr_platform_init(void) {
- grpc_core::ExecCtx exec_ctx;
- grpc_pollset_global_init();
-
- grpc_executor_set_threading(false);
- g_init_thread = gpr_thd_currentid();
+void grpc_set_default_iomgr_platform() {
+ grpc_custom_iomgr_init(&grpc_uv_socket_vtable, &uv_resolver_vtable,
+ &uv_timer_vtable, &uv_pollset_vtable);
}
-void grpc_iomgr_platform_flush(void) {}
-void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
-
-#endif /* GRPC_UV */
+#endif
diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc
index 8c4888ca97..cdef89cbf0 100644
--- a/src/core/lib/iomgr/iomgr_windows.cc
+++ b/src/core/lib/iomgr/iomgr_windows.cc
@@ -29,7 +29,18 @@
#include "src/core/lib/iomgr/iocp_windows.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/pollset_windows.h"
+#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/socket_windows.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/iomgr/timer.h"
+
+extern grpc_tcp_server_vtable grpc_windows_tcp_server_vtable;
+extern grpc_tcp_client_vtable grpc_windows_tcp_client_vtable;
+extern grpc_timer_vtable grpc_generic_timer_vtable;
+extern grpc_pollset_vtable grpc_windows_pollset_vtable;
+extern grpc_pollset_set_vtable grpc_windows_pollset_set_vtable;
+extern grpc_address_resolver_vtable grpc_windows_resolver_vtable;
/* Windows' io manager is going to be fully designed using IO completion
ports. All of what we're doing here is basically make sure that
@@ -46,18 +57,31 @@ static void winsock_shutdown(void) {
GPR_ASSERT(status == 0);
}
-void grpc_iomgr_platform_init(void) {
+static void iomgr_platform_init(void) {
winsock_init();
grpc_iocp_init();
grpc_pollset_global_init();
}
-void grpc_iomgr_platform_flush(void) { grpc_iocp_flush(); }
+static void iomgr_platform_flush(void) { grpc_iocp_flush(); }
-void grpc_iomgr_platform_shutdown(void) {
+static void iomgr_platform_shutdown(void) {
grpc_pollset_global_shutdown();
grpc_iocp_shutdown();
winsock_shutdown();
}
+static grpc_iomgr_platform_vtable vtable = {
+ iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown};
+
+void grpc_set_default_iomgr_platform() {
+ grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable);
+ grpc_set_tcp_server_impl(&grpc_windows_tcp_server_vtable);
+ grpc_set_timer_impl(&grpc_generic_timer_vtable);
+ grpc_set_pollset_vtable(&grpc_windows_pollset_vtable);
+ grpc_set_pollset_set_vtable(&grpc_windows_pollset_set_vtable);
+ grpc_set_resolver_impl(&grpc_windows_resolver_vtable);
+ grpc_set_iomgr_platform_vtable(&vtable);
+}
+
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/pollset.cc b/src/core/lib/iomgr/pollset.cc
new file mode 100644
index 0000000000..ebfef1dbc7
--- /dev/null
+++ b/src/core/lib/iomgr/pollset.cc
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/pollset.h"
+
+grpc_pollset_vtable* grpc_pollset_impl;
+
+void grpc_set_pollset_vtable(grpc_pollset_vtable* vtable) {
+ grpc_pollset_impl = vtable;
+}
+
+void grpc_pollset_global_init() { grpc_pollset_impl->global_init(); }
+
+void grpc_pollset_global_shutdown() { grpc_pollset_impl->global_shutdown(); }
+
+void grpc_pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
+ grpc_pollset_impl->init(pollset, mu);
+}
+
+void grpc_pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
+ grpc_pollset_impl->shutdown(pollset, closure);
+}
+
+void grpc_pollset_destroy(grpc_pollset* pollset) {
+ grpc_pollset_impl->destroy(pollset);
+}
+
+grpc_error* grpc_pollset_work(grpc_pollset* pollset,
+ grpc_pollset_worker** worker,
+ grpc_millis deadline) {
+ return grpc_pollset_impl->work(pollset, worker, deadline);
+}
+
+grpc_error* grpc_pollset_kick(grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker) {
+ return grpc_pollset_impl->kick(pollset, specific_worker);
+}
+
+size_t grpc_pollset_size(void) { return grpc_pollset_impl->pollset_size(); }
diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h
index 9cc3e4c7fa..28472b360d 100644
--- a/src/core/lib/iomgr/pollset.h
+++ b/src/core/lib/iomgr/pollset.h
@@ -38,6 +38,24 @@ extern grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount;
typedef struct grpc_pollset grpc_pollset;
typedef struct grpc_pollset_worker grpc_pollset_worker;
+typedef struct grpc_pollset_vtable {
+ void (*global_init)(void);
+ void (*global_shutdown)(void);
+ void (*init)(grpc_pollset* pollset, gpr_mu** mu);
+ void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
+ void (*destroy)(grpc_pollset* pollset);
+ grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
+ grpc_millis deadline);
+ grpc_error* (*kick)(grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker);
+ size_t (*pollset_size)(void);
+} grpc_pollset_vtable;
+
+void grpc_set_pollset_vtable(grpc_pollset_vtable* vtable);
+
+void grpc_pollset_global_init(void);
+void grpc_pollset_global_shutdown(void);
+
size_t grpc_pollset_size(void);
/* Initialize a pollset: assumes *pollset contains all zeros */
void grpc_pollset_init(grpc_pollset* pollset, gpr_mu** mu);
diff --git a/src/core/lib/iomgr/pollset_custom.cc b/src/core/lib/iomgr/pollset_custom.cc
new file mode 100644
index 0000000000..04bd104055
--- /dev/null
+++ b/src/core/lib/iomgr/pollset_custom.cc
@@ -0,0 +1,106 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <stddef.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/iomgr_custom.h"
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_custom.h"
+#include "src/core/lib/iomgr/timer.h"
+
+#include "src/core/lib/debug/trace.h"
+
+static grpc_custom_poller_vtable* poller_vtable;
+
+struct grpc_pollset {
+ gpr_mu mu;
+};
+
+static size_t pollset_size() { return sizeof(grpc_pollset); }
+
+static void pollset_global_init() { poller_vtable->init(); }
+
+static void pollset_global_shutdown() { poller_vtable->shutdown(); }
+
+static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ gpr_mu_init(&pollset->mu);
+ *mu = &pollset->mu;
+}
+
+static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
+}
+
+static void pollset_destroy(grpc_pollset* pollset) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ gpr_mu_destroy(&pollset->mu);
+}
+
+static grpc_error* pollset_work(grpc_pollset* pollset,
+ grpc_pollset_worker** worker_hdl,
+ grpc_millis deadline) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ gpr_mu_unlock(&pollset->mu);
+ grpc_millis now = grpc_core::ExecCtx::Get()->Now();
+ size_t timeout = 0;
+ if (deadline > now) {
+ timeout = deadline - now;
+ }
+ // We yield here because the poll() call might yield
+ // control back to the application
+ grpc_core::ExecCtx* curr = grpc_core::ExecCtx::Get();
+ grpc_core::ExecCtx::Set(nullptr);
+ poller_vtable->poll(timeout);
+ grpc_core::ExecCtx::Set(curr);
+ grpc_core::ExecCtx::Get()->InvalidateNow();
+ if (grpc_core::ExecCtx::Get()->HasWork()) {
+ grpc_core::ExecCtx::Get()->Flush();
+ }
+ gpr_mu_lock(&pollset->mu);
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error* pollset_kick(grpc_pollset* pollset,
+ grpc_pollset_worker* specific_worker) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ poller_vtable->kick();
+ return GRPC_ERROR_NONE;
+}
+
+grpc_pollset_vtable custom_pollset_vtable = {
+ pollset_global_init, pollset_global_shutdown,
+ pollset_init, pollset_shutdown,
+ pollset_destroy, pollset_work,
+ pollset_kick, pollset_size};
+
+void grpc_custom_pollset_init(grpc_custom_poller_vtable* vtable) {
+ poller_vtable = vtable;
+ grpc_set_pollset_vtable(&custom_pollset_vtable);
+}
diff --git a/src/core/lib/iomgr/pollset_custom.h b/src/core/lib/iomgr/pollset_custom.h
new file mode 100644
index 0000000000..9e2027f7f4
--- /dev/null
+++ b/src/core/lib/iomgr/pollset_custom.h
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_POLLSET_CUSTOM_H
+#define GRPC_CORE_LIB_IOMGR_POLLSET_CUSTOM_H
+
+#include <grpc/support/port_platform.h>
+
+#include <stddef.h>
+
+typedef struct grpc_custom_poller_vtable {
+ void (*init)();
+ void (*poll)(size_t timeout_ms);
+ void (*kick)();
+ void (*shutdown)();
+} grpc_custom_poller_vtable;
+
+void grpc_custom_pollset_init(grpc_custom_poller_vtable* vtable);
+
+#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_CUSTOM_H */
diff --git a/src/core/lib/iomgr/pollset_set_uv.cc b/src/core/lib/iomgr/pollset_set.cc
index 50814c1f0a..42a647a737 100644
--- a/src/core/lib/iomgr/pollset_set_uv.cc
+++ b/src/core/lib/iomgr/pollset_set.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2016 gRPC authors.
+ * Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,28 +18,38 @@
#include <grpc/support/port_platform.h>
-#include "src/core/lib/iomgr/port.h"
+#include "src/core/lib/iomgr/pollset_set.h"
-#ifdef GRPC_UV
+grpc_pollset_set_vtable* grpc_pollset_set_impl;
-#include "src/core/lib/iomgr/pollset_set.h"
+void grpc_set_pollset_set_vtable(grpc_pollset_set_vtable* vtable) {
+ grpc_pollset_set_impl = vtable;
+}
-grpc_pollset_set* grpc_pollset_set_create(void) {
- return (grpc_pollset_set*)((intptr_t)0xdeafbeef);
+grpc_pollset_set* grpc_pollset_set_create() {
+ return grpc_pollset_set_impl->create();
}
-void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
+void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {
+ grpc_pollset_set_impl->destroy(pollset_set);
+}
void grpc_pollset_set_add_pollset(grpc_pollset_set* pollset_set,
- grpc_pollset* pollset) {}
+ grpc_pollset* pollset) {
+ grpc_pollset_set_impl->add_pollset(pollset_set, pollset);
+}
void grpc_pollset_set_del_pollset(grpc_pollset_set* pollset_set,
- grpc_pollset* pollset) {}
+ grpc_pollset* pollset) {
+ grpc_pollset_set_impl->del_pollset(pollset_set, pollset);
+}
void grpc_pollset_set_add_pollset_set(grpc_pollset_set* bag,
- grpc_pollset_set* item) {}
+ grpc_pollset_set* item) {
+ grpc_pollset_set_impl->add_pollset_set(bag, item);
+}
void grpc_pollset_set_del_pollset_set(grpc_pollset_set* bag,
- grpc_pollset_set* item) {}
-
-#endif /* GRPC_UV */
+ grpc_pollset_set* item) {
+ grpc_pollset_set_impl->del_pollset_set(bag, item);
+}
diff --git a/src/core/lib/iomgr/pollset_set.h b/src/core/lib/iomgr/pollset_set.h
index 18f30aa94e..d3355b8ff8 100644
--- a/src/core/lib/iomgr/pollset_set.h
+++ b/src/core/lib/iomgr/pollset_set.h
@@ -30,6 +30,17 @@
typedef struct grpc_pollset_set grpc_pollset_set;
+typedef struct grpc_pollset_set_vtable {
+ grpc_pollset_set* (*create)(void);
+ void (*destroy)(grpc_pollset_set* pollset_set);
+ void (*add_pollset)(grpc_pollset_set* pollset_set, grpc_pollset* pollset);
+ void (*del_pollset)(grpc_pollset_set* pollset_set, grpc_pollset* pollset);
+ void (*add_pollset_set)(grpc_pollset_set* bag, grpc_pollset_set* item);
+ void (*del_pollset_set)(grpc_pollset_set* bag, grpc_pollset_set* item);
+} grpc_pollset_set_vtable;
+
+void grpc_set_pollset_set_vtable(grpc_pollset_set_vtable* vtable);
+
grpc_pollset_set* grpc_pollset_set_create(void);
void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set);
void grpc_pollset_set_add_pollset(grpc_pollset_set* pollset_set,
diff --git a/src/core/lib/iomgr/pollset_set_custom.cc b/src/core/lib/iomgr/pollset_set_custom.cc
new file mode 100644
index 0000000000..b1ee66020d
--- /dev/null
+++ b/src/core/lib/iomgr/pollset_set_custom.cc
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include "src/core/lib/iomgr/pollset_set.h"
+
+grpc_pollset_set* pollset_set_create(void) {
+ return (grpc_pollset_set*)((intptr_t)0xdeafbeef);
+}
+
+void pollset_set_destroy(grpc_pollset_set* pollset_set) {}
+
+void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {}
+
+void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {}
+
+void pollset_set_add_pollset_set(grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
+
+void pollset_set_del_pollset_set(grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
+
+static grpc_pollset_set_vtable vtable = {
+ pollset_set_create, pollset_set_destroy,
+ pollset_set_add_pollset, pollset_set_del_pollset,
+ pollset_set_add_pollset_set, pollset_set_del_pollset_set};
+
+void grpc_custom_pollset_set_init() { grpc_set_pollset_set_vtable(&vtable); }
diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/pollset_set_custom.h
index 093b2d085d..80e19a1fef 100644
--- a/src/core/lib/iomgr/timer_uv.h
+++ b/src/core/lib/iomgr/pollset_set_custom.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2016 gRPC authors.
+ * Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,19 +16,11 @@
*
*/
-#ifndef GRPC_CORE_LIB_IOMGR_TIMER_UV_H
-#define GRPC_CORE_LIB_IOMGR_TIMER_UV_H
+#ifndef GRPC_CORE_LIB_IOMGR_POLLSET_SET_CUSTOM_H
+#define GRPC_CORE_LIB_IOMGR_POLLSET_SET_CUSTOM_H
#include <grpc/support/port_platform.h>
-#include "src/core/lib/iomgr/exec_ctx.h"
+void grpc_custom_pollset_set_init();
-struct grpc_timer {
- grpc_closure* closure;
- /* This is actually a uv_timer_t*, but we want to keep platform-specific
- types out of headers */
- void* uv_timer;
- int pending;
-};
-
-#endif /* GRPC_CORE_LIB_IOMGR_TIMER_UV_H */
+#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_SET_CUSTOM_H */
diff --git a/src/core/lib/iomgr/pollset_set_windows.cc b/src/core/lib/iomgr/pollset_set_windows.cc
index ff3f6a944e..bb9e7f5d28 100644
--- a/src/core/lib/iomgr/pollset_set_windows.cc
+++ b/src/core/lib/iomgr/pollset_set_windows.cc
@@ -25,22 +25,27 @@
#include "src/core/lib/iomgr/pollset_set_windows.h"
-grpc_pollset_set* grpc_pollset_set_create(void) {
+static grpc_pollset_set* pollset_set_create(void) {
return (grpc_pollset_set*)((intptr_t)0xdeafbeef);
}
-void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
+static void pollset_set_destroy(grpc_pollset_set* pollset_set) {}
-void grpc_pollset_set_add_pollset(grpc_pollset_set* pollset_set,
- grpc_pollset* pollset) {}
+static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {}
-void grpc_pollset_set_del_pollset(grpc_pollset_set* pollset_set,
- grpc_pollset* pollset) {}
+static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
+ grpc_pollset* pollset) {}
-void grpc_pollset_set_add_pollset_set(grpc_pollset_set* bag,
- grpc_pollset_set* item) {}
+static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
-void grpc_pollset_set_del_pollset_set(grpc_pollset_set* bag,
- grpc_pollset_set* item) {}
+static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
+ grpc_pollset_set* item) {}
+
+grpc_pollset_set_vtable grpc_windows_pollset_set_vtable = {
+ pollset_set_create, pollset_set_destroy,
+ pollset_set_add_pollset, pollset_set_del_pollset,
+ pollset_set_add_pollset_set, pollset_set_del_pollset_set};
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc
index c6a2f43bf1..bade6eae6c 100644
--- a/src/core/lib/iomgr/pollset_uv.cc
+++ b/src/core/lib/iomgr/pollset_uv.cc
@@ -22,137 +22,72 @@
#ifdef GRPC_UV
-#include <uv.h>
-
-#include <string.h>
-
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/sync.h>
-
-#include "src/core/lib/iomgr/iomgr_uv.h"
-#include "src/core/lib/iomgr/pollset.h"
-#include "src/core/lib/iomgr/pollset_uv.h"
+#include "src/core/lib/iomgr/pollset_custom.h"
-#include "src/core/lib/debug/trace.h"
-
-grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
-
-struct grpc_pollset {
- uv_timer_t* timer;
- int shutting_down;
-};
+#include <uv.h>
/* Indicates that grpc_pollset_work should run an iteration of the UV loop
before running callbacks. This defaults to 1, and should be disabled if
grpc_pollset_work will be called within the callstack of uv_run */
-int grpc_pollset_work_run_loop;
-
-gpr_mu grpc_polling_mu;
+int grpc_pollset_work_run_loop = 1;
-/* This is used solely to kick the uv loop, by setting a callback to be run
- immediately in the next loop iteration.
- Note: In the future, if there is a bug that involves missing wakeups in the
- future, try adding a uv_async_t to kick the loop differently */
-uv_timer_t* dummy_uv_handle;
+static bool g_kicked = false;
-size_t grpc_pollset_size() { return sizeof(grpc_pollset); }
+typedef struct uv_poller_handle {
+ uv_timer_t poll_timer;
+ uv_timer_t kick_timer;
+ int refs;
+} uv_poller_handle;
-void dummy_timer_cb(uv_timer_t* handle) {}
+static uv_poller_handle* g_handle;
-void dummy_handle_close_cb(uv_handle_t* handle) { gpr_free(handle); }
-
-void grpc_pollset_global_init(void) {
- gpr_mu_init(&grpc_polling_mu);
- dummy_uv_handle = (uv_timer_t*)gpr_malloc(sizeof(uv_timer_t));
- uv_timer_init(uv_default_loop(), dummy_uv_handle);
- grpc_pollset_work_run_loop = 1;
-}
-
-void grpc_pollset_global_shutdown(void) {
- GRPC_UV_ASSERT_SAME_THREAD();
- gpr_mu_destroy(&grpc_polling_mu);
- uv_close((uv_handle_t*)dummy_uv_handle, dummy_handle_close_cb);
+static void init() {
+ g_handle = (uv_poller_handle*)gpr_malloc(sizeof(uv_poller_handle));
+ g_handle->refs = 2;
+ uv_timer_init(uv_default_loop(), &g_handle->poll_timer);
+ uv_timer_init(uv_default_loop(), &g_handle->kick_timer);
}
-static void timer_run_cb(uv_timer_t* timer) {}
+static void empty_timer_cb(uv_timer_t* handle) {}
-static void timer_close_cb(uv_handle_t* handle) {
- handle->data = (void*)1;
- gpr_free(handle);
-}
+static void kick_timer_cb(uv_timer_t* handle) { g_kicked = false; }
-void grpc_pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
- GRPC_UV_ASSERT_SAME_THREAD();
- *mu = &grpc_polling_mu;
- pollset->timer = (uv_timer_t*)gpr_malloc(sizeof(uv_timer_t));
- uv_timer_init(uv_default_loop(), pollset->timer);
- pollset->shutting_down = 0;
+static void run_loop(size_t timeout) {
+ if (grpc_pollset_work_run_loop) {
+ if (timeout == 0) {
+ uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ } else {
+ uv_timer_start(&g_handle->poll_timer, empty_timer_cb, timeout, 0);
+ uv_run(uv_default_loop(), UV_RUN_ONCE);
+ uv_timer_stop(&g_handle->poll_timer);
+ }
+ }
}
-void grpc_pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
- GPR_ASSERT(!pollset->shutting_down);
- GRPC_UV_ASSERT_SAME_THREAD();
- pollset->shutting_down = 1;
- if (grpc_pollset_work_run_loop) {
- // Drain any pending UV callbacks without blocking
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
- } else {
- // kick the loop once
- uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
+static void kick() {
+ if (!g_kicked) {
+ g_kicked = true;
+ uv_timer_start(&g_handle->kick_timer, kick_timer_cb, 0, 0);
}
- GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
}
-void grpc_pollset_destroy(grpc_pollset* pollset) {
- GRPC_UV_ASSERT_SAME_THREAD();
- uv_close((uv_handle_t*)pollset->timer, timer_close_cb);
- // timer.data is a boolean indicating that the timer has finished closing
- pollset->timer->data = (void*)0;
- if (grpc_pollset_work_run_loop) {
- while (!pollset->timer->data) {
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
- }
+static void close_timer_cb(uv_handle_t* handle) {
+ g_handle->refs--;
+ if (g_handle->refs == 0) {
+ gpr_free(g_handle);
}
}
-grpc_error* grpc_pollset_work(grpc_pollset* pollset,
- grpc_pollset_worker** worker_hdl,
- grpc_millis deadline) {
- uint64_t timeout;
- GRPC_UV_ASSERT_SAME_THREAD();
- gpr_mu_unlock(&grpc_polling_mu);
+static void shutdown() {
+ uv_close((uv_handle_t*)&g_handle->poll_timer, close_timer_cb);
+ uv_close((uv_handle_t*)&g_handle->kick_timer, close_timer_cb);
if (grpc_pollset_work_run_loop) {
- grpc_millis now = grpc_core::ExecCtx::Get()->Now();
- if (deadline >= now) {
- timeout = deadline - now;
- } else {
- timeout = 0;
- }
- /* We special-case timeout=0 so that we don't bother with the timer when
- the loop won't block anyway */
- if (timeout > 0) {
- uv_timer_start(pollset->timer, timer_run_cb, timeout, 0);
- /* Run until there is some I/O activity or the timer triggers. It doesn't
- matter which happens */
- uv_run(uv_default_loop(), UV_RUN_ONCE);
- uv_timer_stop(pollset->timer);
- } else {
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
- }
- }
- if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) {
- grpc_core::ExecCtx::Get()->Flush();
+ GPR_ASSERT(uv_run(uv_default_loop(), UV_RUN_DEFAULT) == 0);
}
- gpr_mu_lock(&grpc_polling_mu);
- return GRPC_ERROR_NONE;
}
-grpc_error* grpc_pollset_kick(grpc_pollset* pollset,
- grpc_pollset_worker* specific_worker) {
- GRPC_UV_ASSERT_SAME_THREAD();
- uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
- return GRPC_ERROR_NONE;
-}
+grpc_custom_poller_vtable uv_pollset_vtable = {init, run_loop, kick, shutdown};
#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h
index 566c110ca6..de82bcc1d3 100644
--- a/src/core/lib/iomgr/pollset_uv.h
+++ b/src/core/lib/iomgr/pollset_uv.h
@@ -21,7 +21,12 @@
extern int grpc_pollset_work_run_loop;
-void grpc_pollset_global_init(void);
-void grpc_pollset_global_shutdown(void);
+typedef struct grpc_custom_poller_vtable {
+ void (*init)(void);
+ void (*run_loop)(int blocking);
+} grpc_custom_poller_vtable;
+
+void grpc_custom_pollset_global_init(grpc_custom_poller_vtable* vtable);
+void grpc_custom_pollset_global_shutdown(void);
#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */
diff --git a/src/core/lib/iomgr/pollset_windows.cc b/src/core/lib/iomgr/pollset_windows.cc
index c1b83ddc14..e9a808d8ad 100644
--- a/src/core/lib/iomgr/pollset_windows.cc
+++ b/src/core/lib/iomgr/pollset_windows.cc
@@ -38,7 +38,7 @@ gpr_mu grpc_polling_mu;
static grpc_pollset_worker* g_active_poller;
static grpc_pollset_worker g_global_root_worker;
-void grpc_pollset_global_init(void) {
+static void pollset_global_init(void) {
gpr_mu_init(&grpc_polling_mu);
g_active_poller = NULL;
g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next =
@@ -46,7 +46,7 @@ void grpc_pollset_global_init(void) {
&g_global_root_worker;
}
-void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
+static void pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); }
static void remove_worker(grpc_pollset_worker* worker,
grpc_pollset_worker_link_type type) {
@@ -80,21 +80,21 @@ static void push_front_worker(grpc_pollset_worker* root,
worker->links[type].next->links[type].prev = worker;
}
-size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); }
+static size_t pollset_size(void) { return sizeof(grpc_pollset); }
/* There isn't really any such thing as a pollset under Windows, due to the
nature of the IO completion ports. We're still going to provide a minimal
set of features for the sake of the rest of grpc. But grpc_pollset_work
won't actually do any polling, and return as quickly as possible. */
-void grpc_pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
+static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
*mu = &grpc_polling_mu;
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next =
pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev =
&pollset->root_worker;
}
-void grpc_pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
+static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
@@ -104,11 +104,11 @@ void grpc_pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
}
}
-void grpc_pollset_destroy(grpc_pollset* pollset) {}
+static void pollset_destroy(grpc_pollset* pollset) {}
-grpc_error* grpc_pollset_work(grpc_pollset* pollset,
- grpc_pollset_worker** worker_hdl,
- grpc_millis deadline) {
+static grpc_error* pollset_work(grpc_pollset* pollset,
+ grpc_pollset_worker** worker_hdl,
+ grpc_millis deadline) {
grpc_pollset_worker worker;
if (worker_hdl) *worker_hdl = &worker;
@@ -182,8 +182,8 @@ done:
return GRPC_ERROR_NONE;
}
-grpc_error* grpc_pollset_kick(grpc_pollset* p,
- grpc_pollset_worker* specific_worker) {
+static grpc_error* pollset_kick(grpc_pollset* p,
+ grpc_pollset_worker* specific_worker) {
if (specific_worker != NULL) {
if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
for (specific_worker =
@@ -220,4 +220,10 @@ grpc_error* grpc_pollset_kick(grpc_pollset* p,
return GRPC_ERROR_NONE;
}
+grpc_pollset_vtable grpc_windows_pollset_vtable = {
+ pollset_global_init, pollset_global_shutdown,
+ pollset_init, pollset_shutdown,
+ pollset_destroy, pollset_work,
+ pollset_kick, pollset_size};
+
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h
index 25090898ed..c1dcc52618 100644
--- a/src/core/lib/iomgr/port.h
+++ b/src/core/lib/iomgr/port.h
@@ -21,8 +21,11 @@
#ifndef GRPC_CORE_LIB_IOMGR_PORT_H
#define GRPC_CORE_LIB_IOMGR_PORT_H
-#if defined(GRPC_UV)
-// Do nothing
+#ifdef GRPC_UV
+#define GRPC_CUSTOM_SOCKET
+#endif
+#if defined(GRPC_CUSTOM_SOCKET)
+// Do Nothing
#elif defined(GPR_MANYLINUX1)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
@@ -33,13 +36,10 @@
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
-#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
-#define GRPC_TIMER_USE_GENERIC 1
#define GRPC_LINUX_EPOLL 1
#elif defined(GPR_WINDOWS)
-#define GRPC_TIMER_USE_GENERIC 1
#define GRPC_WINSOCK_SOCKET 1
#define GRPC_WINDOWS_SOCKETUTILS 1
#elif defined(GPR_ANDROID)
@@ -49,10 +49,8 @@
#define GRPC_HAVE_UNIX_SOCKET 1
#define GRPC_LINUX_EVENTFD 1
#define GRPC_POSIX_SOCKET 1
-#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
-#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_LINUX)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
@@ -64,9 +62,7 @@
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_HOST_NAME_MAX 1
#define GRPC_POSIX_SOCKET 1
-#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_WAKEUP_FD 1
-#define GRPC_TIMER_USE_GENERIC 1
#ifdef __GLIBC_PREREQ
#if __GLIBC_PREREQ(2, 4)
#define GRPC_LINUX_EPOLL 1
@@ -100,11 +96,9 @@
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
-#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_SYSCONF 1
#define GRPC_POSIX_WAKEUP_FD 1
-#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_FREEBSD)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_HAVE_IFADDRS 1
@@ -114,36 +108,31 @@
#define GRPC_POSIX_FORK 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
-#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
-#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_OPENBSD)
#define GRPC_HAVE_IFADDRS 1
#define GRPC_HAVE_IPV6_RECVPKTINFO 1
#define GRPC_HAVE_UNIX_SOCKET 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
-#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
-#define GRPC_TIMER_USE_GENERIC 1
#elif defined(GPR_NACL)
#define GRPC_HAVE_ARPA_NAMESER 1
#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GRPC_POSIX_SOCKET 1
-#define GRPC_POSIX_SOCKETADDR 1
#define GRPC_POSIX_SOCKETUTILS 1
#define GRPC_POSIX_WAKEUP_FD 1
-#define GRPC_TIMER_USE_GENERIC 1
#elif !defined(GPR_NO_AUTODETECT_PLATFORM)
#error "Platform not recognized"
#endif
#if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \
- defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_UV) != \
+ defined(GRPC_CUSTOM_SOCKET) != \
1
-#error Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GPR_CUSTOM_SOCKET
+#error \
+ "Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GRPC_CUSTOM_SOCKET"
#endif
#if defined(GRPC_POSIX_HOST_NAME_MAX) && defined(GRPC_POSIX_SYSCONF)
diff --git a/src/core/lib/iomgr/resolve_address.cc b/src/core/lib/iomgr/resolve_address.cc
new file mode 100644
index 0000000000..f2a4676369
--- /dev/null
+++ b/src/core/lib/iomgr/resolve_address.cc
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include <grpc/support/alloc.h>
+#include "src/core/lib/iomgr/resolve_address.h"
+
+grpc_address_resolver_vtable* grpc_resolve_address_impl;
+
+void grpc_set_resolver_impl(grpc_address_resolver_vtable* vtable) {
+ grpc_resolve_address_impl = vtable;
+}
+
+void grpc_resolve_address(const char* addr, const char* default_port,
+ grpc_pollset_set* interested_parties,
+ grpc_closure* on_done,
+ grpc_resolved_addresses** addresses) {
+ grpc_resolve_address_impl->resolve_address(
+ addr, default_port, interested_parties, on_done, addresses);
+}
+
+void grpc_resolved_addresses_destroy(grpc_resolved_addresses* addrs) {
+ if (addrs != nullptr) {
+ gpr_free(addrs->addrs);
+ }
+ gpr_free(addrs);
+}
+
+grpc_error* grpc_blocking_resolve_address(const char* name,
+ const char* default_port,
+ grpc_resolved_addresses** addresses) {
+ return grpc_resolve_address_impl->blocking_resolve_address(name, default_port,
+ addresses);
+}
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index 10a7822654..7da5caaa8e 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -37,20 +37,33 @@ typedef struct {
grpc_resolved_address* addrs;
} grpc_resolved_addresses;
+typedef struct grpc_address_resolver_vtable {
+ void (*resolve_address)(const char* addr, const char* default_port,
+ grpc_pollset_set* interested_parties,
+ grpc_closure* on_done,
+ grpc_resolved_addresses** addresses);
+ grpc_error* (*blocking_resolve_address)(const char* name,
+ const char* default_port,
+ grpc_resolved_addresses** addresses);
+} grpc_address_resolver_vtable;
+
+void grpc_set_resolver_impl(grpc_address_resolver_vtable* vtable);
+
/* Asynchronously resolve addr. Use default_port if a port isn't designated
in addr, otherwise use the port in addr. */
/* TODO(ctiller): add a timeout here */
-extern void (*grpc_resolve_address)(const char* addr, const char* default_port,
- grpc_pollset_set* interested_parties,
- grpc_closure* on_done,
- grpc_resolved_addresses** addresses);
+void grpc_resolve_address(const char* addr, const char* default_port,
+ grpc_pollset_set* interested_parties,
+ grpc_closure* on_done,
+ grpc_resolved_addresses** addresses);
+
/* Destroy resolved addresses */
void grpc_resolved_addresses_destroy(grpc_resolved_addresses* addresses);
-/* Resolve addr in a blocking fashion. Returns NULL on failure. On success,
+/* Resolve addr in a blocking fashion. On success,
result must be freed with grpc_resolved_addresses_destroy. */
-extern grpc_error* (*grpc_blocking_resolve_address)(
- const char* name, const char* default_port,
- grpc_resolved_addresses** addresses);
+grpc_error* grpc_blocking_resolve_address(const char* name,
+ const char* default_port,
+ grpc_resolved_addresses** addresses);
#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_H */
diff --git a/src/core/lib/iomgr/resolve_address_custom.cc b/src/core/lib/iomgr/resolve_address_custom.cc
new file mode 100644
index 0000000000..9cf7817f66
--- /dev/null
+++ b/src/core/lib/iomgr/resolve_address_custom.cc
@@ -0,0 +1,187 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/string_util.h>
+
+#include <grpc/support/log.h>
+#include "src/core/lib/gpr/host_port.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/gpr/useful.h"
+
+#include "src/core/lib/iomgr/iomgr_custom.h"
+#include "src/core/lib/iomgr/resolve_address_custom.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+
+#include <string.h>
+
+typedef struct grpc_custom_resolver {
+ grpc_closure* on_done;
+ grpc_resolved_addresses** addresses;
+ char* host;
+ char* port;
+} grpc_custom_resolver;
+
+static grpc_custom_resolver_vtable* resolve_address_vtable = nullptr;
+
+static int retry_named_port_failure(grpc_custom_resolver* r,
+ grpc_resolved_addresses** res) {
+ // This loop is copied from resolve_address_posix.c
+ const char* svc[][2] = {{"http", "80"}, {"https", "443"}};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
+ if (strcmp(r->port, svc[i][0]) == 0) {
+ gpr_free(r->port);
+ r->port = gpr_strdup(svc[i][1]);
+ if (res) {
+ grpc_error* error =
+ resolve_address_vtable->resolve(r->host, r->port, res);
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(error);
+ return 0;
+ }
+ } else {
+ resolve_address_vtable->resolve_async(r, r->host, r->port);
+ }
+ return 1;
+ }
+ }
+ return 0;
+}
+
+void grpc_custom_resolve_callback(grpc_custom_resolver* r,
+ grpc_resolved_addresses* result,
+ grpc_error* error) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ grpc_core::ExecCtx exec_ctx;
+ if (error == GRPC_ERROR_NONE) {
+ *r->addresses = result;
+ } else if (retry_named_port_failure(r, nullptr)) {
+ return;
+ }
+ if (r->on_done) {
+ GRPC_CLOSURE_SCHED(r->on_done, error);
+ }
+ gpr_free(r->host);
+ gpr_free(r->port);
+ gpr_free(r);
+}
+
+static grpc_error* try_split_host_port(const char* name,
+ const char* default_port, char** host,
+ char** port) {
+ /* parse name, splitting it into host and port parts */
+ grpc_error* error;
+ gpr_split_host_port(name, host, port);
+ if (*host == nullptr) {
+ char* msg;
+ gpr_asprintf(&msg, "unparseable host:port: '%s'", name);
+ error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ return error;
+ }
+ if (*port == nullptr) {
+ // TODO(murgatroid99): add tests for this case
+ if (default_port == nullptr) {
+ char* msg;
+ gpr_asprintf(&msg, "no port in name '%s'", name);
+ error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+ gpr_free(msg);
+ return error;
+ }
+ *port = gpr_strdup(default_port);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error* blocking_resolve_address_impl(
+ const char* name, const char* default_port,
+ grpc_resolved_addresses** addresses) {
+ char* host;
+ char* port;
+ grpc_error* err;
+
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+
+ err = try_split_host_port(name, default_port, &host, &port);
+ if (err != GRPC_ERROR_NONE) {
+ gpr_free(host);
+ gpr_free(port);
+ return err;
+ }
+
+ /* Call getaddrinfo */
+ grpc_custom_resolver resolver;
+ resolver.host = host;
+ resolver.port = port;
+
+ grpc_resolved_addresses* addrs;
+ grpc_core::ExecCtx* curr = grpc_core::ExecCtx::Get();
+ grpc_core::ExecCtx::Set(nullptr);
+ err = resolve_address_vtable->resolve(host, port, &addrs);
+ if (err != GRPC_ERROR_NONE) {
+ if (retry_named_port_failure(&resolver, &addrs)) {
+ GRPC_ERROR_UNREF(err);
+ err = GRPC_ERROR_NONE;
+ }
+ }
+ grpc_core::ExecCtx::Set(curr);
+ if (err == GRPC_ERROR_NONE) {
+ *addresses = addrs;
+ }
+ gpr_free(resolver.host);
+ gpr_free(resolver.port);
+ return err;
+}
+
+static void resolve_address_impl(const char* name, const char* default_port,
+ grpc_pollset_set* interested_parties,
+ grpc_closure* on_done,
+ grpc_resolved_addresses** addrs) {
+ grpc_custom_resolver* r = nullptr;
+ char* host = nullptr;
+ char* port = nullptr;
+ grpc_error* err;
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ err = try_split_host_port(name, default_port, &host, &port);
+ if (err != GRPC_ERROR_NONE) {
+ GRPC_CLOSURE_SCHED(on_done, err);
+ gpr_free(host);
+ gpr_free(port);
+ return;
+ }
+ r = (grpc_custom_resolver*)gpr_malloc(sizeof(grpc_custom_resolver));
+ r->on_done = on_done;
+ r->addresses = addrs;
+ r->host = host;
+ r->port = port;
+
+ /* Call getaddrinfo */
+ resolve_address_vtable->resolve_async(r, r->host, r->port);
+}
+
+static grpc_address_resolver_vtable custom_resolver_vtable = {
+ resolve_address_impl, blocking_resolve_address_impl};
+
+void grpc_custom_resolver_init(grpc_custom_resolver_vtable* impl) {
+ resolve_address_vtable = impl;
+ grpc_set_resolver_impl(&custom_resolver_vtable);
+}
diff --git a/src/core/lib/iomgr/resolve_address_custom.h b/src/core/lib/iomgr/resolve_address_custom.h
new file mode 100644
index 0000000000..e0c6714087
--- /dev/null
+++ b/src/core/lib/iomgr/resolve_address_custom.h
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_CUSTOM_H
+#define GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_CUSTOM_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+
+typedef struct grpc_custom_resolver grpc_custom_resolver;
+
+typedef struct grpc_custom_resolver_vtable {
+ grpc_error* (*resolve)(char* host, char* port, grpc_resolved_addresses** res);
+ void (*resolve_async)(grpc_custom_resolver* resolver, char* host, char* port);
+} grpc_custom_resolver_vtable;
+
+void grpc_custom_resolve_callback(grpc_custom_resolver* resolver,
+ grpc_resolved_addresses* result,
+ grpc_error* error);
+
+/* Internal APIs */
+void grpc_custom_resolver_init(grpc_custom_resolver_vtable* impl);
+
+#endif /* GRPC_CORE_LIB_IOMGR_RESOLVE_ADDRESS_CUSTOM_H */
diff --git a/src/core/lib/iomgr/resolve_address_posix.cc b/src/core/lib/iomgr/resolve_address_posix.cc
index 2f68dbe214..a82075542f 100644
--- a/src/core/lib/iomgr/resolve_address_posix.cc
+++ b/src/core/lib/iomgr/resolve_address_posix.cc
@@ -42,7 +42,7 @@
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
-static grpc_error* blocking_resolve_address_impl(
+static grpc_error* posix_blocking_resolve_address(
const char* name, const char* default_port,
grpc_resolved_addresses** addresses) {
grpc_core::ExecCtx exec_ctx;
@@ -141,10 +141,6 @@ done:
return err;
}
-grpc_error* (*grpc_blocking_resolve_address)(
- const char* name, const char* default_port,
- grpc_resolved_addresses** addresses) = blocking_resolve_address_impl;
-
typedef struct {
char* name;
char* default_port;
@@ -165,17 +161,10 @@ static void do_request_thread(void* rp, grpc_error* error) {
gpr_free(r);
}
-void grpc_resolved_addresses_destroy(grpc_resolved_addresses* addrs) {
- if (addrs != nullptr) {
- gpr_free(addrs->addrs);
- }
- gpr_free(addrs);
-}
-
-static void resolve_address_impl(const char* name, const char* default_port,
- grpc_pollset_set* interested_parties,
- grpc_closure* on_done,
- grpc_resolved_addresses** addrs) {
+static void posix_resolve_address(const char* name, const char* default_port,
+ grpc_pollset_set* interested_parties,
+ grpc_closure* on_done,
+ grpc_resolved_addresses** addrs) {
request* r = static_cast<request*>(gpr_malloc(sizeof(request)));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
@@ -186,9 +175,6 @@ static void resolve_address_impl(const char* name, const char* default_port,
GRPC_CLOSURE_SCHED(&r->request_closure, GRPC_ERROR_NONE);
}
-void (*grpc_resolve_address)(
- const char* name, const char* default_port,
- grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_resolved_addresses** addrs) = resolve_address_impl;
-
+grpc_address_resolver_vtable grpc_posix_resolver_vtable = {
+ posix_resolve_address, posix_blocking_resolve_address};
#endif
diff --git a/src/core/lib/iomgr/resolve_address_uv.cc b/src/core/lib/iomgr/resolve_address_uv.cc
deleted file mode 100644
index 4d8ea596f3..0000000000
--- a/src/core/lib/iomgr/resolve_address_uv.cc
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_UV
-
-#include <uv.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-
-#include "src/core/lib/gpr/host_port.h"
-#include "src/core/lib/gpr/useful.h"
-#include "src/core/lib/iomgr/closure.h"
-#include "src/core/lib/iomgr/error.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/iomgr/iomgr_uv.h"
-#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/iomgr/sockaddr.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
-
-#include <string.h>
-
-typedef struct request {
- grpc_closure* on_done;
- grpc_resolved_addresses** addresses;
- struct addrinfo* hints;
- char* host;
- char* port;
-} request;
-
-static int retry_named_port_failure(int status, request* r,
- uv_getaddrinfo_cb getaddrinfo_cb) {
- if (status != 0) {
- // This loop is copied from resolve_address_posix.c
- const char* svc[][2] = {{"http", "80"}, {"https", "443"}};
- for (size_t i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
- if (strcmp(r->port, svc[i][0]) == 0) {
- int retry_status;
- uv_getaddrinfo_t* req =
- (uv_getaddrinfo_t*)gpr_malloc(sizeof(uv_getaddrinfo_t));
- req->data = r;
- r->port = gpr_strdup(svc[i][1]);
- retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb,
- r->host, r->port, r->hints);
- if (retry_status < 0 || getaddrinfo_cb == NULL) {
- // The callback will not be called
- gpr_free(req);
- }
- return retry_status;
- }
- }
- }
- /* If this function calls uv_getaddrinfo, it will return that function's
- return value. That function only returns numbers <=0, so we can safely
- return 1 to indicate that we never retried */
- return 1;
-}
-
-static grpc_error* handle_addrinfo_result(int status, struct addrinfo* result,
- grpc_resolved_addresses** addresses) {
- struct addrinfo* resp;
- size_t i;
- if (status != 0) {
- grpc_error* error;
- *addresses = NULL;
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getaddrinfo failed");
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- return error;
- }
- (*addresses) =
- (grpc_resolved_addresses*)gpr_malloc(sizeof(grpc_resolved_addresses));
- (*addresses)->naddrs = 0;
- for (resp = result; resp != NULL; resp = resp->ai_next) {
- (*addresses)->naddrs++;
- }
- (*addresses)->addrs = (grpc_resolved_address*)gpr_malloc(
- sizeof(grpc_resolved_address) * (*addresses)->naddrs);
- i = 0;
- for (resp = result; resp != NULL; resp = resp->ai_next) {
- memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
- (*addresses)->addrs[i].len = resp->ai_addrlen;
- i++;
- }
-
- {
- for (i = 0; i < (*addresses)->naddrs; i++) {
- char* buf;
- grpc_sockaddr_to_string(&buf, &(*addresses)->addrs[i], 0);
- gpr_free(buf);
- }
- }
- return GRPC_ERROR_NONE;
-}
-
-static void getaddrinfo_callback(uv_getaddrinfo_t* req, int status,
- struct addrinfo* res) {
- request* r = (request*)req->data;
- grpc_core::ExecCtx exec_ctx;
- grpc_error* error;
- int retry_status;
- char* port = r->port;
-
- gpr_free(req);
- retry_status = retry_named_port_failure(status, r, getaddrinfo_callback);
- if (retry_status == 0) {
- /* The request is being retried. It is using its own port string, so we free
- * the original one */
- gpr_free(port);
- return;
- }
- /* Either no retry was attempted, or the retry failed. Either way, the
- original error probably has more interesting information */
- error = handle_addrinfo_result(status, res, r->addresses);
- GRPC_CLOSURE_SCHED(r->on_done, error);
-
- gpr_free(r->hints);
- gpr_free(r->host);
- gpr_free(r->port);
- gpr_free(r);
- uv_freeaddrinfo(res);
-}
-
-static grpc_error* try_split_host_port(const char* name,
- const char* default_port, char** host,
- char** port) {
- /* parse name, splitting it into host and port parts */
- grpc_error* error;
- gpr_split_host_port(name, host, port);
- if (*host == NULL) {
- char* msg;
- gpr_asprintf(&msg, "unparseable host:port: '%s'", name);
- error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- gpr_free(msg);
- return error;
- }
- if (*port == NULL) {
- // TODO(murgatroid99): add tests for this case
- if (default_port == NULL) {
- char* msg;
- gpr_asprintf(&msg, "no port in name '%s'", name);
- error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
- gpr_free(msg);
- return error;
- }
- *port = gpr_strdup(default_port);
- }
- return GRPC_ERROR_NONE;
-}
-
-static grpc_error* blocking_resolve_address_impl(
- const char* name, const char* default_port,
- grpc_resolved_addresses** addresses) {
- char* host;
- char* port;
- struct addrinfo hints;
- uv_getaddrinfo_t req;
- int s;
- grpc_error* err;
- int retry_status;
- request r;
-
- GRPC_UV_ASSERT_SAME_THREAD();
-
- req.addrinfo = NULL;
-
- err = try_split_host_port(name, default_port, &host, &port);
- if (err != GRPC_ERROR_NONE) {
- goto done;
- }
-
- /* Call getaddrinfo */
- memset(&hints, 0, sizeof(hints));
- hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */
- hints.ai_socktype = SOCK_STREAM; /* stream socket */
- hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
-
- s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
- r.addresses = addresses;
- r.hints = &hints;
- r.host = host;
- r.port = port;
- retry_status = retry_named_port_failure(s, &r, NULL);
- if (retry_status <= 0) {
- s = retry_status;
- }
- err = handle_addrinfo_result(s, req.addrinfo, addresses);
-
-done:
- gpr_free(host);
- gpr_free(port);
- if (req.addrinfo) {
- uv_freeaddrinfo(req.addrinfo);
- }
- return err;
-}
-
-grpc_error* (*grpc_blocking_resolve_address)(
- const char* name, const char* default_port,
- grpc_resolved_addresses** addresses) = blocking_resolve_address_impl;
-
-void grpc_resolved_addresses_destroy(grpc_resolved_addresses* addrs) {
- if (addrs != NULL) {
- gpr_free(addrs->addrs);
- }
- gpr_free(addrs);
-}
-
-static void resolve_address_impl(const char* name, const char* default_port,
- grpc_pollset_set* interested_parties,
- grpc_closure* on_done,
- grpc_resolved_addresses** addrs) {
- uv_getaddrinfo_t* req = NULL;
- request* r = NULL;
- struct addrinfo* hints = NULL;
- char* host = NULL;
- char* port = NULL;
- grpc_error* err;
- int s;
- GRPC_UV_ASSERT_SAME_THREAD();
- err = try_split_host_port(name, default_port, &host, &port);
- if (err != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(on_done, err);
- gpr_free(host);
- gpr_free(port);
- return;
- }
- r = (request*)gpr_malloc(sizeof(request));
- r->on_done = on_done;
- r->addresses = addrs;
- r->host = host;
- r->port = port;
- req = (uv_getaddrinfo_t*)gpr_malloc(sizeof(uv_getaddrinfo_t));
- req->data = r;
-
- /* Call getaddrinfo */
- hints = (addrinfo*)gpr_malloc(sizeof(struct addrinfo));
- memset(hints, 0, sizeof(struct addrinfo));
- hints->ai_family = AF_UNSPEC; /* ipv4 or ipv6 */
- hints->ai_socktype = SOCK_STREAM; /* stream socket */
- hints->ai_flags = AI_PASSIVE; /* for wildcard IP address */
- r->hints = hints;
-
- s = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_callback, host, port,
- hints);
-
- if (s != 0) {
- *addrs = NULL;
- err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getaddrinfo failed");
- err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(s)));
- GRPC_CLOSURE_SCHED(on_done, err);
- gpr_free(r);
- gpr_free(req);
- gpr_free(hints);
- gpr_free(host);
- gpr_free(port);
- }
-}
-
-void (*grpc_resolve_address)(
- const char* name, const char* default_port,
- grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_resolved_addresses** addrs) = resolve_address_impl;
-
-#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/resolve_address_windows.cc b/src/core/lib/iomgr/resolve_address_windows.cc
index 7a62c88720..71c92615ad 100644
--- a/src/core/lib/iomgr/resolve_address_windows.cc
+++ b/src/core/lib/iomgr/resolve_address_windows.cc
@@ -51,7 +51,7 @@ typedef struct {
grpc_resolved_addresses** addresses;
} request;
-static grpc_error* blocking_resolve_address_impl(
+static grpc_error* windows_blocking_resolve_address(
const char* name, const char* default_port,
grpc_resolved_addresses** addresses) {
grpc_core::ExecCtx exec_ctx;
@@ -130,10 +130,6 @@ done:
return error;
}
-grpc_error* (*grpc_blocking_resolve_address)(
- const char* name, const char* default_port,
- grpc_resolved_addresses** addresses) = blocking_resolve_address_impl;
-
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
static void do_request_thread(void* rp, grpc_error* error) {
@@ -150,17 +146,10 @@ static void do_request_thread(void* rp, grpc_error* error) {
gpr_free(r);
}
-void grpc_resolved_addresses_destroy(grpc_resolved_addresses* addrs) {
- if (addrs != NULL) {
- gpr_free(addrs->addrs);
- }
- gpr_free(addrs);
-}
-
-static void resolve_address_impl(const char* name, const char* default_port,
- grpc_pollset_set* interested_parties,
- grpc_closure* on_done,
- grpc_resolved_addresses** addresses) {
+static void windows_resolve_address(const char* name, const char* default_port,
+ grpc_pollset_set* interested_parties,
+ grpc_closure* on_done,
+ grpc_resolved_addresses** addresses) {
request* r = (request*)gpr_malloc(sizeof(request));
GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
grpc_executor_scheduler(GRPC_EXECUTOR_SHORT));
@@ -171,9 +160,6 @@ static void resolve_address_impl(const char* name, const char* default_port,
GRPC_CLOSURE_SCHED(&r->request_closure, GRPC_ERROR_NONE);
}
-void (*grpc_resolve_address)(
- const char* name, const char* default_port,
- grpc_pollset_set* interested_parties, grpc_closure* on_done,
- grpc_resolved_addresses** addresses) = resolve_address_impl;
-
+grpc_address_resolver_vtable grpc_windows_resolver_vtable = {
+ windows_resolve_address, windows_blocking_resolve_address};
#endif
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index 4e1c651278..89e8a39118 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -139,8 +139,4 @@ void grpc_resource_user_alloc_slices(
grpc_resource_user_slice_allocator* slice_allocator, size_t length,
size_t count, grpc_slice_buffer* dest);
-/* Allocate one slice of length \a size synchronously. */
-grpc_slice grpc_resource_user_slice_malloc(grpc_resource_user* resource_user,
- size_t size);
-
#endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */
diff --git a/src/core/lib/iomgr/sockaddr.h b/src/core/lib/iomgr/sockaddr.h
index 3b30da8a7d..5edf735cd1 100644
--- a/src/core/lib/iomgr/sockaddr.h
+++ b/src/core/lib/iomgr/sockaddr.h
@@ -25,18 +25,8 @@
#include <grpc/support/port_platform.h>
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GRPC_UV
-#include <uv.h>
-#endif
-
-#ifdef GPR_WINDOWS
-#include "src/core/lib/iomgr/sockaddr_windows.h"
-#endif
-
-#ifdef GRPC_POSIX_SOCKETADDR
+#include "src/core/lib/iomgr/sockaddr_custom.h"
#include "src/core/lib/iomgr/sockaddr_posix.h"
-#endif
+#include "src/core/lib/iomgr/sockaddr_windows.h"
#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_H */
diff --git a/src/core/lib/iomgr/sockaddr_custom.h b/src/core/lib/iomgr/sockaddr_custom.h
new file mode 100644
index 0000000000..d85cc504d3
--- /dev/null
+++ b/src/core/lib/iomgr/sockaddr_custom.h
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_SOCKADDR_CUSTOM_H
+#define GRPC_CORE_LIB_IOMGR_SOCKADDR_CUSTOM_H
+
+#include <grpc/support/port_platform.h>
+
+#include <stddef.h>
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_UV
+
+#include <uv.h>
+
+// TODO(kpayson) It would be nice to abstract this so we don't
+// depend on anything uv specific
+typedef struct sockaddr grpc_sockaddr;
+typedef struct sockaddr_in grpc_sockaddr_in;
+typedef struct in_addr grpc_in_addr;
+typedef struct sockaddr_in6 grpc_sockaddr_in6;
+typedef struct in6_addr grpc_in6_addr;
+
+#define GRPC_INET_ADDRSTRLEN INET_ADDRSTRLEN
+#define GRPC_INET6_ADDRSTRLEN INET6_ADDRSTRLEN
+
+#define GRPC_SOCK_STREAM SOCK_STREAM
+#define GRPC_SOCK_DGRAM SOCK_DGRAM
+
+#define GRPC_AF_UNSPEC AF_UNSPEC
+#define GRPC_AF_UNIX AF_UNIX
+#define GRPC_AF_INET AF_INET
+#define GRPC_AF_INET6 AF_INET6
+
+#define GRPC_AI_PASSIVE AI_PASSIVE
+
+#endif // GRPC_UV
+
+#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_CUSTOM_H */
diff --git a/src/core/lib/iomgr/sockaddr_posix.h b/src/core/lib/iomgr/sockaddr_posix.h
index 83981e0aa5..5b18bbc465 100644
--- a/src/core/lib/iomgr/sockaddr_posix.h
+++ b/src/core/lib/iomgr/sockaddr_posix.h
@@ -21,6 +21,9 @@
#include <grpc/support/port_platform.h>
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_POSIX_SOCKET
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
@@ -28,4 +31,25 @@
#include <sys/types.h>
#include <unistd.h>
+typedef struct sockaddr grpc_sockaddr;
+typedef struct sockaddr_in grpc_sockaddr_in;
+typedef struct in_addr grpc_in_addr;
+typedef struct sockaddr_in6 grpc_sockaddr_in6;
+typedef struct in6_addr grpc_in6_addr;
+
+#define GRPC_INET_ADDRSTRLEN INET_ADDRSTRLEN
+#define GRPC_INET6_ADDRSTRLEN INET6_ADDRSTRLEN
+
+#define GRPC_SOCK_STREAM SOCK_STREAM
+#define GRPC_SOCK_DGRAM SOCK_DGRAM
+
+#define GRPC_AF_UNSPEC AF_UNSPEC
+#define GRPC_AF_UNIX AF_UNIX
+#define GRPC_AF_INET AF_INET
+#define GRPC_AF_INET6 AF_INET6
+
+#define GRPC_AI_PASSIVE AI_PASSIVE
+
+#endif
+
#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_POSIX_H */
diff --git a/src/core/lib/iomgr/sockaddr_utils.cc b/src/core/lib/iomgr/sockaddr_utils.cc
index 88f9b2ffd9..bc3550a679 100644
--- a/src/core/lib/iomgr/sockaddr_utils.cc
+++ b/src/core/lib/iomgr/sockaddr_utils.cc
@@ -40,25 +40,25 @@ static const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0,
int grpc_sockaddr_is_v4mapped(const grpc_resolved_address* resolved_addr,
grpc_resolved_address* resolved_addr4_out) {
GPR_ASSERT(resolved_addr != resolved_addr4_out);
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
- struct sockaddr_in* addr4_out =
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
+ grpc_sockaddr_in* addr4_out =
resolved_addr4_out == nullptr
? nullptr
- : reinterpret_cast<struct sockaddr_in*>(resolved_addr4_out->addr);
- if (addr->sa_family == AF_INET6) {
- const struct sockaddr_in6* addr6 =
- reinterpret_cast<const struct sockaddr_in6*>(addr);
+ : reinterpret_cast<grpc_sockaddr_in*>(resolved_addr4_out->addr);
+ if (addr->sa_family == GRPC_AF_INET6) {
+ const grpc_sockaddr_in6* addr6 =
+ reinterpret_cast<const grpc_sockaddr_in6*>(addr);
if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix,
sizeof(kV4MappedPrefix)) == 0) {
if (resolved_addr4_out != nullptr) {
/* Normalize ::ffff:0.0.0.0/96 to IPv4. */
memset(resolved_addr4_out, 0, sizeof(*resolved_addr4_out));
- addr4_out->sin_family = AF_INET;
+ addr4_out->sin_family = GRPC_AF_INET;
/* s6_addr32 would be nice, but it's non-standard. */
memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4);
addr4_out->sin_port = addr6->sin6_port;
- resolved_addr4_out->len = sizeof(struct sockaddr_in);
+ resolved_addr4_out->len = sizeof(grpc_sockaddr_in);
}
return 1;
}
@@ -69,19 +69,19 @@ int grpc_sockaddr_is_v4mapped(const grpc_resolved_address* resolved_addr,
int grpc_sockaddr_to_v4mapped(const grpc_resolved_address* resolved_addr,
grpc_resolved_address* resolved_addr6_out) {
GPR_ASSERT(resolved_addr != resolved_addr6_out);
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
- struct sockaddr_in6* addr6_out =
- reinterpret_cast<struct sockaddr_in6*>(resolved_addr6_out->addr);
- if (addr->sa_family == AF_INET) {
- const struct sockaddr_in* addr4 =
- reinterpret_cast<const struct sockaddr_in*>(addr);
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
+ grpc_sockaddr_in6* addr6_out =
+ reinterpret_cast<grpc_sockaddr_in6*>(resolved_addr6_out->addr);
+ if (addr->sa_family == GRPC_AF_INET) {
+ const grpc_sockaddr_in* addr4 =
+ reinterpret_cast<const grpc_sockaddr_in*>(addr);
memset(resolved_addr6_out, 0, sizeof(*resolved_addr6_out));
- addr6_out->sin6_family = AF_INET6;
+ addr6_out->sin6_family = GRPC_AF_INET6;
memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12);
memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4);
addr6_out->sin6_port = addr4->sin_port;
- resolved_addr6_out->len = sizeof(struct sockaddr_in6);
+ resolved_addr6_out->len = sizeof(grpc_sockaddr_in6);
return 1;
}
return 0;
@@ -89,32 +89,32 @@ int grpc_sockaddr_to_v4mapped(const grpc_resolved_address* resolved_addr,
int grpc_sockaddr_is_wildcard(const grpc_resolved_address* resolved_addr,
int* port_out) {
- const struct sockaddr* addr;
+ const grpc_sockaddr* addr;
grpc_resolved_address addr4_normalized;
if (grpc_sockaddr_is_v4mapped(resolved_addr, &addr4_normalized)) {
resolved_addr = &addr4_normalized;
}
- addr = reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
- if (addr->sa_family == AF_INET) {
+ addr = reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
+ if (addr->sa_family == GRPC_AF_INET) {
/* Check for 0.0.0.0 */
- const struct sockaddr_in* addr4 =
- reinterpret_cast<const struct sockaddr_in*>(addr);
+ const grpc_sockaddr_in* addr4 =
+ reinterpret_cast<const grpc_sockaddr_in*>(addr);
if (addr4->sin_addr.s_addr != 0) {
return 0;
}
- *port_out = ntohs(addr4->sin_port);
+ *port_out = grpc_ntohs(addr4->sin_port);
return 1;
- } else if (addr->sa_family == AF_INET6) {
+ } else if (addr->sa_family == GRPC_AF_INET6) {
/* Check for :: */
- const struct sockaddr_in6* addr6 =
- reinterpret_cast<const struct sockaddr_in6*>(addr);
+ const grpc_sockaddr_in6* addr6 =
+ reinterpret_cast<const grpc_sockaddr_in6*>(addr);
int i;
for (i = 0; i < 16; i++) {
if (addr6->sin6_addr.s6_addr[i] != 0) {
return 0;
}
}
- *port_out = ntohs(addr6->sin6_port);
+ *port_out = grpc_ntohs(addr6->sin6_port);
return 1;
} else {
return 0;
@@ -129,33 +129,33 @@ void grpc_sockaddr_make_wildcards(int port, grpc_resolved_address* wild4_out,
void grpc_sockaddr_make_wildcard4(int port,
grpc_resolved_address* resolved_wild_out) {
- struct sockaddr_in* wild_out =
- reinterpret_cast<struct sockaddr_in*>(resolved_wild_out->addr);
+ grpc_sockaddr_in* wild_out =
+ reinterpret_cast<grpc_sockaddr_in*>(resolved_wild_out->addr);
GPR_ASSERT(port >= 0 && port < 65536);
memset(resolved_wild_out, 0, sizeof(*resolved_wild_out));
- wild_out->sin_family = AF_INET;
- wild_out->sin_port = htons(static_cast<uint16_t>(port));
- resolved_wild_out->len = sizeof(struct sockaddr_in);
+ wild_out->sin_family = GRPC_AF_INET;
+ wild_out->sin_port = grpc_htons(static_cast<uint16_t>(port));
+ resolved_wild_out->len = sizeof(grpc_sockaddr_in);
}
void grpc_sockaddr_make_wildcard6(int port,
grpc_resolved_address* resolved_wild_out) {
- struct sockaddr_in6* wild_out =
- reinterpret_cast<struct sockaddr_in6*>(resolved_wild_out->addr);
+ grpc_sockaddr_in6* wild_out =
+ reinterpret_cast<grpc_sockaddr_in6*>(resolved_wild_out->addr);
GPR_ASSERT(port >= 0 && port < 65536);
memset(resolved_wild_out, 0, sizeof(*resolved_wild_out));
- wild_out->sin6_family = AF_INET6;
- wild_out->sin6_port = htons(static_cast<uint16_t>(port));
- resolved_wild_out->len = sizeof(struct sockaddr_in6);
+ wild_out->sin6_family = GRPC_AF_INET6;
+ wild_out->sin6_port = grpc_htons(static_cast<uint16_t>(port));
+ resolved_wild_out->len = sizeof(grpc_sockaddr_in6);
}
int grpc_sockaddr_to_string(char** out,
const grpc_resolved_address* resolved_addr,
int normalize) {
- const struct sockaddr* addr;
+ const grpc_sockaddr* addr;
const int save_errno = errno;
grpc_resolved_address addr_normalized;
- char ntop_buf[INET6_ADDRSTRLEN];
+ char ntop_buf[GRPC_INET6_ADDRSTRLEN];
const void* ip = nullptr;
int port = 0;
uint32_t sin6_scope_id = 0;
@@ -165,17 +165,17 @@ int grpc_sockaddr_to_string(char** out,
if (normalize && grpc_sockaddr_is_v4mapped(resolved_addr, &addr_normalized)) {
resolved_addr = &addr_normalized;
}
- addr = reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
- if (addr->sa_family == AF_INET) {
- const struct sockaddr_in* addr4 =
- reinterpret_cast<const struct sockaddr_in*>(addr);
+ addr = reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
+ if (addr->sa_family == GRPC_AF_INET) {
+ const grpc_sockaddr_in* addr4 =
+ reinterpret_cast<const grpc_sockaddr_in*>(addr);
ip = &addr4->sin_addr;
- port = ntohs(addr4->sin_port);
- } else if (addr->sa_family == AF_INET6) {
- const struct sockaddr_in6* addr6 =
- reinterpret_cast<const struct sockaddr_in6*>(addr);
+ port = grpc_ntohs(addr4->sin_port);
+ } else if (addr->sa_family == GRPC_AF_INET6) {
+ const grpc_sockaddr_in6* addr6 =
+ reinterpret_cast<const grpc_sockaddr_in6*>(addr);
ip = &addr6->sin6_addr;
- port = ntohs(addr6->sin6_port);
+ port = grpc_ntohs(addr6->sin6_port);
sin6_scope_id = addr6->sin6_scope_id;
}
if (ip != nullptr && grpc_inet_ntop(addr->sa_family, ip, ntop_buf,
@@ -197,6 +197,22 @@ int grpc_sockaddr_to_string(char** out,
return ret;
}
+void grpc_string_to_sockaddr(grpc_resolved_address* out, char* addr, int port) {
+ grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)out->addr;
+ grpc_sockaddr_in* addr4 = (grpc_sockaddr_in*)out->addr;
+
+ if (grpc_inet_pton(GRPC_AF_INET6, addr, &addr6->sin6_addr) == 1) {
+ addr6->sin6_family = GRPC_AF_INET6;
+ out->len = sizeof(grpc_sockaddr_in6);
+ } else if (grpc_inet_pton(GRPC_AF_INET, addr, &addr4->sin_addr) == 1) {
+ addr4->sin_family = GRPC_AF_INET;
+ out->len = sizeof(grpc_sockaddr_in);
+ } else {
+ GPR_ASSERT(0);
+ }
+ grpc_sockaddr_set_port(out, port);
+}
+
char* grpc_sockaddr_to_uri(const grpc_resolved_address* resolved_addr) {
grpc_resolved_address addr_normalized;
if (grpc_sockaddr_is_v4mapped(resolved_addr, &addr_normalized)) {
@@ -219,33 +235,33 @@ char* grpc_sockaddr_to_uri(const grpc_resolved_address* resolved_addr) {
const char* grpc_sockaddr_get_uri_scheme(
const grpc_resolved_address* resolved_addr) {
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
switch (addr->sa_family) {
- case AF_INET:
+ case GRPC_AF_INET:
return "ipv4";
- case AF_INET6:
+ case GRPC_AF_INET6:
return "ipv6";
- case AF_UNIX:
+ case GRPC_AF_UNIX:
return "unix";
}
return nullptr;
}
int grpc_sockaddr_get_family(const grpc_resolved_address* resolved_addr) {
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
return addr->sa_family;
}
int grpc_sockaddr_get_port(const grpc_resolved_address* resolved_addr) {
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
switch (addr->sa_family) {
- case AF_INET:
- return ntohs(((struct sockaddr_in*)addr)->sin_port);
- case AF_INET6:
- return ntohs(((struct sockaddr_in6*)addr)->sin6_port);
+ case GRPC_AF_INET:
+ return grpc_ntohs(((grpc_sockaddr_in*)addr)->sin_port);
+ case GRPC_AF_INET6:
+ return grpc_ntohs(((grpc_sockaddr_in6*)addr)->sin6_port);
default:
if (grpc_is_unix_socket(resolved_addr)) {
return 1;
@@ -258,18 +274,18 @@ int grpc_sockaddr_get_port(const grpc_resolved_address* resolved_addr) {
int grpc_sockaddr_set_port(const grpc_resolved_address* resolved_addr,
int port) {
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
switch (addr->sa_family) {
- case AF_INET:
+ case GRPC_AF_INET:
GPR_ASSERT(port >= 0 && port < 65536);
- ((struct sockaddr_in*)addr)->sin_port =
- htons(static_cast<uint16_t>(port));
+ ((grpc_sockaddr_in*)addr)->sin_port =
+ grpc_htons(static_cast<uint16_t>(port));
return 1;
- case AF_INET6:
+ case GRPC_AF_INET6:
GPR_ASSERT(port >= 0 && port < 65536);
- ((struct sockaddr_in6*)addr)->sin6_port =
- htons(static_cast<uint16_t>(port));
+ ((grpc_sockaddr_in6*)addr)->sin6_port =
+ grpc_htons(static_cast<uint16_t>(port));
return 1;
default:
gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port",
diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h
index ace54a2a80..a4e90a73ab 100644
--- a/src/core/lib/iomgr/sockaddr_utils.h
+++ b/src/core/lib/iomgr/sockaddr_utils.h
@@ -71,6 +71,8 @@ int grpc_sockaddr_set_port(const grpc_resolved_address* addr, int port);
int grpc_sockaddr_to_string(char** out, const grpc_resolved_address* addr,
int normalize);
+void grpc_string_to_sockaddr(grpc_resolved_address* out, char* addr, int port);
+
/* Returns the URI string corresponding to \a addr */
char* grpc_sockaddr_to_uri(const grpc_resolved_address* addr);
diff --git a/src/core/lib/iomgr/sockaddr_windows.h b/src/core/lib/iomgr/sockaddr_windows.h
index 3a4fcc9e8a..4d637251a1 100644
--- a/src/core/lib/iomgr/sockaddr_windows.h
+++ b/src/core/lib/iomgr/sockaddr_windows.h
@@ -31,6 +31,25 @@
// must be included after the above
#include <mswsock.h>
+typedef struct sockaddr grpc_sockaddr;
+typedef struct sockaddr_in grpc_sockaddr_in;
+typedef struct in_addr grpc_in_addr;
+typedef struct sockaddr_in6 grpc_sockaddr_in6;
+typedef struct in6_addr grpc_in6_addr;
+
+#define GRPC_INET_ADDRSTRLEN INET_ADDRSTRLEN
+#define GRPC_INET6_ADDRSTRLEN INET6_ADDRSTRLEN
+
+#define GRPC_SOCK_STREAM SOCK_STREAM
+#define GRPC_SOCK_DGRAM SOCK_DGRAM
+
+#define GRPC_AF_UNSPEC AF_UNSPEC
+#define GRPC_AF_UNIX AF_UNIX
+#define GRPC_AF_INET AF_INET
+#define GRPC_AF_INET6 AF_INET6
+
+#define GRPC_AI_PASSIVE AI_PASSIVE
+
#endif
#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_WINDOWS_H */
diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h
index e96eb97a7e..cf1a7be648 100644
--- a/src/core/lib/iomgr/socket_utils.h
+++ b/src/core/lib/iomgr/socket_utils.h
@@ -23,6 +23,15 @@
#include <stddef.h>
+/* A wrapper for htons on POSIX and Windows */
+uint16_t grpc_htons(uint16_t hostshort);
+
+/* A wrapper for ntohs on POSIX and WINDOWS */
+uint16_t grpc_ntohs(uint16_t netshort);
+
+/* A wrapper for inet_pton on POSIX and WINDOWS */
+int grpc_inet_pton(int af, const char* src, void* dst);
+
/* A wrapper for inet_ntop on POSIX systems and InetNtop on Windows systems */
const char* grpc_inet_ntop(int af, const void* src, char* dst, size_t size);
diff --git a/src/core/lib/iomgr/socket_utils_common_posix.cc b/src/core/lib/iomgr/socket_utils_common_posix.cc
index 4fb6c7ad63..c52e237fa8 100644
--- a/src/core/lib/iomgr/socket_utils_common_posix.cc
+++ b/src/core/lib/iomgr/socket_utils_common_posix.cc
@@ -43,6 +43,7 @@
#include "src/core/lib/gpr/host_port.h"
#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
/* set a socket to non blocking mode */
@@ -215,12 +216,11 @@ static void probe_ipv6_once(void) {
if (fd < 0) {
gpr_log(GPR_INFO, "Disabling AF_INET6 sockets because socket() failed.");
} else {
- struct sockaddr_in6 addr;
+ grpc_sockaddr_in6 addr;
memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
addr.sin6_addr.s6_addr[15] = 1; /* [::1]:0 */
- if (bind(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) ==
- 0) {
+ if (bind(fd, reinterpret_cast<grpc_sockaddr*>(&addr), sizeof(addr)) == 0) {
g_ipv6_loopback_available = 1;
} else {
gpr_log(GPR_INFO,
@@ -280,8 +280,8 @@ static int create_socket(grpc_socket_factory* factory, int domain, int type,
grpc_error* grpc_create_dualstack_socket_using_factory(
grpc_socket_factory* factory, const grpc_resolved_address* resolved_addr,
int type, int protocol, grpc_dualstack_mode* dsmode, int* newfd) {
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
int family = addr->sa_family;
if (family == AF_INET6) {
if (grpc_ipv6_loopback_available()) {
@@ -311,6 +311,14 @@ grpc_error* grpc_create_dualstack_socket_using_factory(
return error_for_fd(*newfd, resolved_addr);
}
+uint16_t grpc_htons(uint16_t hostshort) { return htons(hostshort); }
+
+uint16_t grpc_ntohs(uint16_t netshort) { return ntohs(netshort); }
+
+int grpc_inet_pton(int af, const char* src, void* dst) {
+ return inet_pton(af, src, dst);
+}
+
const char* grpc_inet_ntop(int af, const void* src, char* dst, size_t size) {
GPR_ASSERT(size <= (socklen_t)-1);
return inet_ntop(af, src, dst, static_cast<socklen_t>(size));
diff --git a/src/core/lib/iomgr/socket_utils_linux.cc b/src/core/lib/iomgr/socket_utils_linux.cc
index deb7c55267..1364cd35f6 100644
--- a/src/core/lib/iomgr/socket_utils_linux.cc
+++ b/src/core/lib/iomgr/socket_utils_linux.cc
@@ -37,8 +37,7 @@ int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock,
GPR_ASSERT(resolved_addr->len <= (socklen_t)-1);
flags |= nonblock ? SOCK_NONBLOCK : 0;
flags |= cloexec ? SOCK_CLOEXEC : 0;
- return accept4(sockfd,
- reinterpret_cast<struct sockaddr*>(resolved_addr->addr),
+ return accept4(sockfd, reinterpret_cast<grpc_sockaddr*>(resolved_addr->addr),
reinterpret_cast<socklen_t*>(&resolved_addr->len), flags);
}
diff --git a/src/core/lib/iomgr/socket_utils_posix.cc b/src/core/lib/iomgr/socket_utils_posix.cc
index c856f641e3..d5d00af976 100644
--- a/src/core/lib/iomgr/socket_utils_posix.cc
+++ b/src/core/lib/iomgr/socket_utils_posix.cc
@@ -36,7 +36,7 @@ int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock,
int fd, flags;
GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t));
GPR_ASSERT(resolved_addr->len <= (socklen_t)-1);
- fd = accept(sockfd, (struct sockaddr*)resolved_addr->addr,
+ fd = accept(sockfd, (grpc_sockaddr*)resolved_addr->addr,
(socklen_t*)&resolved_addr->len);
if (fd >= 0) {
if (nonblock) {
diff --git a/src/core/lib/iomgr/socket_utils_uv.cc b/src/core/lib/iomgr/socket_utils_uv.cc
index 3f650eef66..8538abc7e4 100644
--- a/src/core/lib/iomgr/socket_utils_uv.cc
+++ b/src/core/lib/iomgr/socket_utils_uv.cc
@@ -22,15 +22,24 @@
#ifdef GRPC_UV
-#include <uv.h>
-
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/socket_utils.h"
#include <grpc/support/log.h>
+#include <uv.h>
+
+uint16_t grpc_htons(uint16_t hostshort) { return htons(hostshort); }
+
+uint16_t grpc_ntohs(uint16_t netshort) { return ntohs(netshort); }
+
+int grpc_inet_pton(int af, const char* src, void* dst) {
+ return inet_pton(af, src, dst);
+}
+
const char* grpc_inet_ntop(int af, const void* src, char* dst, size_t size) {
- uv_inet_ntop(af, src, dst, size);
- return dst;
+ /* Windows InetNtopA wants a mutable ip pointer */
+ return inet_ntop(af, src, dst, (socklen_t)size);
}
#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/socket_utils_windows.cc b/src/core/lib/iomgr/socket_utils_windows.cc
index 5fc3b7617e..3e7b5b812d 100644
--- a/src/core/lib/iomgr/socket_utils_windows.cc
+++ b/src/core/lib/iomgr/socket_utils_windows.cc
@@ -27,6 +27,14 @@
#include <grpc/support/log.h>
+uint16_t grpc_htons(uint16_t hostshort) { return htons(hostshort); }
+
+uint16_t grpc_ntohs(uint16_t netshort) { return ntohs(netshort); }
+
+int grpc_inet_pton(int af, const char* src, void* dst) {
+ return inet_pton(af, src, dst);
+}
+
const char* grpc_inet_ntop(int af, const void* src, char* dst, size_t size) {
/* Windows InetNtopA wants a mutable ip pointer */
return InetNtopA(af, (void*)src, dst, size);
diff --git a/src/core/lib/iomgr/tcp_client.cc b/src/core/lib/iomgr/tcp_client.cc
new file mode 100644
index 0000000000..6c0ba40781
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client.cc
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/tcp_client.h"
+
+grpc_tcp_client_vtable* grpc_tcp_client_impl;
+
+void grpc_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* addr,
+ grpc_millis deadline) {
+ grpc_tcp_client_impl->connect(closure, ep, interested_parties, channel_args,
+ addr, deadline);
+}
+
+void grpc_set_tcp_client_impl(grpc_tcp_client_vtable* impl) {
+ grpc_tcp_client_impl = impl;
+}
diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h
index a6b99e63c2..d209eeb8c2 100644
--- a/src/core/lib/iomgr/tcp_client.h
+++ b/src/core/lib/iomgr/tcp_client.h
@@ -27,6 +27,13 @@
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/iomgr/resolve_address.h"
+typedef struct grpc_tcp_client_vtable {
+ void (*connect)(grpc_closure* on_connect, grpc_endpoint** endpoint,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* addr, grpc_millis deadline);
+} grpc_tcp_client_vtable;
+
/* Asynchronously connect to an address (specified as (addr, len)), and call
cb with arg and the completed connection when done (or call cb with arg and
NULL on failure).
@@ -38,4 +45,8 @@ void grpc_tcp_client_connect(grpc_closure* on_connect, grpc_endpoint** endpoint,
const grpc_resolved_address* addr,
grpc_millis deadline);
+void grpc_tcp_client_global_init();
+
+void grpc_set_tcp_client_impl(grpc_tcp_client_vtable* impl);
+
#endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */
diff --git a/src/core/lib/iomgr/tcp_client_custom.cc b/src/core/lib/iomgr/tcp_client_custom.cc
new file mode 100644
index 0000000000..55632a55a1
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_client_custom.cc
@@ -0,0 +1,151 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/iomgr_custom.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_custom.h"
+#include "src/core/lib/iomgr/timer.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+extern grpc_socket_vtable* grpc_custom_socket_vtable;
+
+struct grpc_custom_tcp_connect {
+ grpc_custom_socket* socket;
+ grpc_timer alarm;
+ grpc_closure on_alarm;
+ grpc_closure* closure;
+ grpc_endpoint** endpoint;
+ int refs;
+ char* addr_name;
+ grpc_resource_quota* resource_quota;
+};
+
+static void custom_tcp_connect_cleanup(grpc_custom_tcp_connect* connect) {
+ grpc_custom_socket* socket = connect->socket;
+ grpc_resource_quota_unref_internal(connect->resource_quota);
+ gpr_free(connect->addr_name);
+ gpr_free(connect);
+ socket->refs--;
+ if (socket->refs == 0) {
+ grpc_custom_socket_vtable->destroy(socket);
+ gpr_free(socket);
+ }
+}
+
+static void custom_close_callback(grpc_custom_socket* socket) {}
+
+static void on_alarm(void* acp, grpc_error* error) {
+ int done;
+ grpc_custom_socket* socket = (grpc_custom_socket*)acp;
+ grpc_custom_tcp_connect* connect = socket->connector;
+ if (grpc_tcp_trace.enabled()) {
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s",
+ connect->addr_name, str);
+ }
+ if (error == GRPC_ERROR_NONE) {
+ /* error == NONE implies that the timer ran out, and wasn't cancelled. If
+ it was cancelled, then the handler that cancelled it also should close
+ the handle, if applicable */
+ grpc_custom_socket_vtable->close(socket, custom_close_callback);
+ }
+ done = (--connect->refs == 0);
+ if (done) {
+ custom_tcp_connect_cleanup(connect);
+ }
+}
+
+static void custom_connect_callback(grpc_custom_socket* socket,
+ grpc_error* error) {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_custom_tcp_connect* connect = socket->connector;
+ int done;
+ grpc_closure* closure = connect->closure;
+ grpc_timer_cancel(&connect->alarm);
+ if (error == GRPC_ERROR_NONE) {
+ *connect->endpoint = custom_tcp_endpoint_create(
+ socket, connect->resource_quota, connect->addr_name);
+ }
+ done = (--connect->refs == 0);
+ if (done) {
+ grpc_core::ExecCtx::Get()->Flush();
+ custom_tcp_connect_cleanup(connect);
+ }
+ GRPC_CLOSURE_SCHED(closure, error);
+}
+
+static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* resolved_addr,
+ grpc_millis deadline) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ (void)channel_args;
+ (void)interested_parties;
+ grpc_custom_tcp_connect* connect;
+ grpc_resource_quota* resource_quota = grpc_resource_quota_create(nullptr);
+ if (channel_args != nullptr) {
+ for (size_t i = 0; i < channel_args->num_args; i++) {
+ if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
+ grpc_resource_quota_unref_internal(resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
+ (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
+ }
+ }
+ }
+ grpc_custom_socket* socket =
+ (grpc_custom_socket*)gpr_malloc(sizeof(grpc_custom_socket));
+ socket->refs = 2;
+ grpc_custom_socket_vtable->init(socket, GRPC_AF_UNSPEC);
+ connect =
+ (grpc_custom_tcp_connect*)gpr_malloc(sizeof(grpc_custom_tcp_connect));
+ connect->closure = closure;
+ connect->endpoint = ep;
+ connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
+ connect->resource_quota = resource_quota;
+ connect->socket = socket;
+ socket->connector = connect;
+ socket->endpoint = nullptr;
+ socket->listener = nullptr;
+ connect->refs = 2;
+
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %p %s: asynchronously connecting",
+ socket, connect->addr_name);
+ }
+
+ grpc_custom_socket_vtable->connect(
+ socket, (const grpc_sockaddr*)resolved_addr->addr, resolved_addr->len,
+ custom_connect_callback);
+ GRPC_CLOSURE_INIT(&connect->on_alarm, on_alarm, socket,
+ grpc_schedule_on_exec_ctx);
+ grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
+}
+
+grpc_tcp_client_vtable custom_tcp_client_vtable = {tcp_connect};
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 3fe2989c6b..c21fb40ab1 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -38,6 +38,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr_posix.h"
+#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/socket_mutator.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
@@ -293,7 +294,7 @@ void grpc_tcp_client_create_from_prepared_fd(
async_connect* ac;
do {
GPR_ASSERT(addr->len < ~(socklen_t)0);
- err = connect(fd, reinterpret_cast<const struct sockaddr*>(addr->addr),
+ err = connect(fd, reinterpret_cast<const grpc_sockaddr*>(addr->addr),
static_cast<socklen_t>(addr->len));
} while (err < 0 && errno == EINTR);
if (err >= 0) {
@@ -336,11 +337,11 @@ void grpc_tcp_client_create_from_prepared_fd(
gpr_mu_unlock(&ac->mu);
}
-static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties,
- const grpc_channel_args* channel_args,
- const grpc_resolved_address* addr,
- grpc_millis deadline) {
+static void tcp_connect(grpc_closure* closure, grpc_endpoint** ep,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* addr,
+ grpc_millis deadline) {
grpc_resolved_address mapped_addr;
grpc_fd* fdobj = nullptr;
grpc_error* error;
@@ -355,20 +356,5 @@ static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep,
ep);
}
-// overridden by api_fuzzer.c
-void (*grpc_tcp_client_connect_impl)(
- grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
- const grpc_resolved_address* addr,
- grpc_millis deadline) = tcp_client_connect_impl;
-
-void grpc_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties,
- const grpc_channel_args* channel_args,
- const grpc_resolved_address* addr,
- grpc_millis deadline) {
- grpc_tcp_client_connect_impl(closure, ep, interested_parties, channel_args,
- addr, deadline);
-}
-
+grpc_tcp_client_vtable grpc_posix_tcp_client_vtable = {tcp_connect};
#endif
diff --git a/src/core/lib/iomgr/tcp_client_uv.cc b/src/core/lib/iomgr/tcp_client_uv.cc
deleted file mode 100644
index d29d6c8f41..0000000000
--- a/src/core/lib/iomgr/tcp_client_uv.cc
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GRPC_UV
-
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "src/core/lib/iomgr/error.h"
-#include "src/core/lib/iomgr/iomgr_uv.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/iomgr/tcp_client.h"
-#include "src/core/lib/iomgr/tcp_uv.h"
-#include "src/core/lib/iomgr/timer.h"
-
-extern grpc_core::TraceFlag grpc_tcp_trace;
-
-typedef struct grpc_uv_tcp_connect {
- uv_connect_t connect_req;
- grpc_timer alarm;
- grpc_closure on_alarm;
- uv_tcp_t* tcp_handle;
- grpc_closure* closure;
- grpc_endpoint** endpoint;
- int refs;
- char* addr_name;
- grpc_resource_quota* resource_quota;
-} grpc_uv_tcp_connect;
-
-static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect* connect) {
- grpc_resource_quota_unref_internal(connect->resource_quota);
- gpr_free(connect->addr_name);
- gpr_free(connect);
-}
-
-static void tcp_close_callback(uv_handle_t* handle) { gpr_free(handle); }
-
-static void uv_tc_on_alarm(void* acp, grpc_error* error) {
- int done;
- grpc_uv_tcp_connect* connect = (grpc_uv_tcp_connect*)acp;
- if (grpc_tcp_trace.enabled()) {
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s",
- connect->addr_name, str);
- }
- if (error == GRPC_ERROR_NONE) {
- /* error == NONE implies that the timer ran out, and wasn't cancelled. If
- it was cancelled, then the handler that cancelled it also should close
- the handle, if applicable */
- uv_close((uv_handle_t*)connect->tcp_handle, tcp_close_callback);
- }
- done = (--connect->refs == 0);
- if (done) {
- uv_tcp_connect_cleanup(connect);
- }
-}
-
-static void uv_tc_on_connect(uv_connect_t* req, int status) {
- grpc_uv_tcp_connect* connect = (grpc_uv_tcp_connect*)req->data;
- grpc_core::ExecCtx exec_ctx;
- grpc_error* error = GRPC_ERROR_NONE;
- int done;
- grpc_closure* closure = connect->closure;
- grpc_timer_cancel(&connect->alarm);
- if (status == 0) {
- *connect->endpoint = grpc_tcp_create(
- connect->tcp_handle, connect->resource_quota, connect->addr_name);
- } else {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Failed to connect to remote host");
- error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status);
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- if (status == UV_ECANCELED) {
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string("Timeout occurred"));
- // This should only happen if the handle is already closed
- } else {
- error = grpc_error_set_str(
- error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- uv_close((uv_handle_t*)connect->tcp_handle, tcp_close_callback);
- }
- }
- done = (--connect->refs == 0);
- if (done) {
- grpc_core::ExecCtx::Get()->Flush();
- uv_tcp_connect_cleanup(connect);
- }
- GRPC_CLOSURE_SCHED(closure, error);
-}
-
-static void tcp_client_connect_impl(grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties,
- const grpc_channel_args* channel_args,
- const grpc_resolved_address* resolved_addr,
- grpc_millis deadline) {
- grpc_uv_tcp_connect* connect;
- grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL);
- (void)channel_args;
- (void)interested_parties;
-
- GRPC_UV_ASSERT_SAME_THREAD();
-
- if (channel_args != NULL) {
- for (size_t i = 0; i < channel_args->num_args; i++) {
- if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_unref_internal(resource_quota);
- resource_quota = grpc_resource_quota_ref_internal(
- (grpc_resource_quota*)channel_args->args[i].value.pointer.p);
- }
- }
- }
-
- connect = (grpc_uv_tcp_connect*)gpr_zalloc(sizeof(grpc_uv_tcp_connect));
- connect->closure = closure;
- connect->endpoint = ep;
- connect->tcp_handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
- connect->addr_name = grpc_sockaddr_to_uri(resolved_addr);
- connect->resource_quota = resource_quota;
- uv_tcp_init(uv_default_loop(), connect->tcp_handle);
- connect->connect_req.data = connect;
- connect->refs = 2; // One for the connect operation, one for the timer.
-
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting",
- connect->addr_name);
- }
-
- // TODO(murgatroid99): figure out what the return value here means
- uv_tcp_connect(&connect->connect_req, connect->tcp_handle,
- (const struct sockaddr*)resolved_addr->addr, uv_tc_on_connect);
- GRPC_CLOSURE_INIT(&connect->on_alarm, uv_tc_on_alarm, connect,
- grpc_schedule_on_exec_ctx);
- grpc_timer_init(&connect->alarm, deadline, &connect->on_alarm);
-}
-
-// overridden by api_fuzzer.c
-void (*grpc_tcp_client_connect_impl)(
- grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
- const grpc_resolved_address* addr,
- grpc_millis deadline) = tcp_client_connect_impl;
-
-void grpc_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties,
- const grpc_channel_args* channel_args,
- const grpc_resolved_address* addr,
- grpc_millis deadline) {
- grpc_tcp_client_connect_impl(closure, ep, interested_parties, channel_args,
- addr, deadline);
-}
-
-#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_client_windows.cc b/src/core/lib/iomgr/tcp_client_windows.cc
index 70c2495350..e5b5502597 100644
--- a/src/core/lib/iomgr/tcp_client_windows.cc
+++ b/src/core/lib/iomgr/tcp_client_windows.cc
@@ -122,12 +122,11 @@ static void on_connect(void* acp, grpc_error* error) {
/* Tries to issue one async connection, then schedules both an IOCP
notification request for the connection, and one timeout alert. */
-static void tcp_client_connect_impl(grpc_closure* on_done,
- grpc_endpoint** endpoint,
- grpc_pollset_set* interested_parties,
- const grpc_channel_args* channel_args,
- const grpc_resolved_address* addr,
- grpc_millis deadline) {
+static void tcp_connect(grpc_closure* on_done, grpc_endpoint** endpoint,
+ grpc_pollset_set* interested_parties,
+ const grpc_channel_args* channel_args,
+ const grpc_resolved_address* addr,
+ grpc_millis deadline) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
int status;
@@ -175,7 +174,7 @@ static void tcp_client_connect_impl(grpc_closure* on_done,
grpc_sockaddr_make_wildcard6(0, &local_address);
status =
- bind(sock, (struct sockaddr*)&local_address.addr, (int)local_address.len);
+ bind(sock, (grpc_sockaddr*)&local_address.addr, (int)local_address.len);
if (status != 0) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
goto failure;
@@ -183,7 +182,7 @@ static void tcp_client_connect_impl(grpc_closure* on_done,
socket = grpc_winsocket_create(sock, "client");
info = &socket->write_info;
- success = ConnectEx(sock, (struct sockaddr*)&addr->addr, (int)addr->len, NULL,
+ success = ConnectEx(sock, (grpc_sockaddr*)&addr->addr, (int)addr->len, NULL,
0, NULL, &info->overlapped);
/* It wouldn't be unusual to get a success immediately. But we'll still get
@@ -227,20 +226,6 @@ failure:
GRPC_CLOSURE_SCHED(on_done, final_error);
}
-// overridden by api_fuzzer.c
-void (*grpc_tcp_client_connect_impl)(
- grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties, const grpc_channel_args* channel_args,
- const grpc_resolved_address* addr,
- grpc_millis deadline) = tcp_client_connect_impl;
-
-void grpc_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* interested_parties,
- const grpc_channel_args* channel_args,
- const grpc_resolved_address* addr,
- grpc_millis deadline) {
- grpc_tcp_client_connect_impl(closure, ep, interested_parties, channel_args,
- addr, deadline);
-}
+grpc_tcp_client_vtable grpc_windows_tcp_client_vtable = {tcp_connect};
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc
new file mode 100644
index 0000000000..2b1fc93028
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_custom.cc
@@ -0,0 +1,365 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <limits.h>
+#include <string.h>
+
+#include <grpc/slice_buffer.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/iomgr_custom.h"
+#include "src/core/lib/iomgr/network_status_tracker.h"
+#include "src/core/lib/iomgr/resource_quota.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_custom.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+
+#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+grpc_socket_vtable* grpc_custom_socket_vtable = nullptr;
+extern grpc_tcp_server_vtable custom_tcp_server_vtable;
+extern grpc_tcp_client_vtable custom_tcp_client_vtable;
+
+void grpc_custom_endpoint_init(grpc_socket_vtable* impl) {
+ grpc_custom_socket_vtable = impl;
+ grpc_set_tcp_client_impl(&custom_tcp_client_vtable);
+ grpc_set_tcp_server_impl(&custom_tcp_server_vtable);
+}
+
+typedef struct {
+ grpc_endpoint base;
+ gpr_refcount refcount;
+ grpc_custom_socket* socket;
+
+ grpc_closure* read_cb;
+ grpc_closure* write_cb;
+
+ grpc_slice_buffer* read_slices;
+ grpc_slice_buffer* write_slices;
+
+ grpc_resource_user* resource_user;
+ grpc_resource_user_slice_allocator slice_allocator;
+
+ bool shutting_down;
+
+ char* peer_string;
+} custom_tcp_endpoint;
+
+static void tcp_free(grpc_custom_socket* s) {
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)s->endpoint;
+ grpc_resource_user_unref(tcp->resource_user);
+ gpr_free(tcp->peer_string);
+ gpr_free(tcp);
+ s->refs--;
+ if (s->refs == 0) {
+ grpc_custom_socket_vtable->destroy(s);
+ gpr_free(s);
+ }
+}
+
+#ifndef NDEBUG
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
+#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
+static void tcp_unref(custom_tcp_endpoint* tcp, const char* reason,
+ const char* file, int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_ERROR,
+ "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp->socket, reason,
+ val, val - 1);
+ }
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp->socket);
+ }
+}
+
+static void tcp_ref(custom_tcp_endpoint* tcp, const char* reason,
+ const char* file, int line) {
+ if (grpc_tcp_trace.enabled()) {
+ gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_ERROR,
+ "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp->socket, reason,
+ val, val + 1);
+ }
+ gpr_ref(&tcp->refcount);
+}
+#else
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
+#define TCP_REF(tcp, reason) tcp_ref((tcp))
+static void tcp_unref(custom_tcp_endpoint* tcp) {
+ if (gpr_unref(&tcp->refcount)) {
+ tcp_free(tcp->socket);
+ }
+}
+
+static void tcp_ref(custom_tcp_endpoint* tcp) { gpr_ref(&tcp->refcount); }
+#endif
+
+static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) {
+ grpc_closure* cb = tcp->read_cb;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp->socket, cb, cb->cb,
+ cb->cb_arg);
+ size_t i;
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+
+ for (i = 0; i < tcp->read_slices->count; i++) {
+ char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
+ gpr_free(dump);
+ }
+ }
+ TCP_UNREF(tcp, "read");
+ tcp->read_slices = nullptr;
+ tcp->read_cb = nullptr;
+ GRPC_CLOSURE_RUN(cb, error);
+}
+
+static void custom_read_callback(grpc_custom_socket* socket, size_t nread,
+ grpc_error* error) {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_slice_buffer garbage;
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)socket->endpoint;
+ if (error == GRPC_ERROR_NONE && nread == 0) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF");
+ }
+ if (error == GRPC_ERROR_NONE) {
+ // Successful read
+ if ((size_t)nread < tcp->read_slices->length) {
+ /* TODO(murgatroid99): Instead of discarding the unused part of the read
+ * buffer, reuse it as the next read buffer. */
+ grpc_slice_buffer_init(&garbage);
+ grpc_slice_buffer_trim_end(
+ tcp->read_slices, tcp->read_slices->length - (size_t)nread, &garbage);
+ grpc_slice_buffer_reset_and_unref_internal(&garbage);
+ }
+ } else {
+ grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
+ }
+ call_read_cb(tcp, error);
+}
+
+static void tcp_read_allocation_done(void* tcpp, grpc_error* error) {
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)tcpp;
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp->socket,
+ grpc_error_string(error));
+ }
+ if (error == GRPC_ERROR_NONE) {
+ /* Before calling read, we allocate a buffer with exactly one slice
+ * to tcp->read_slices and wait for the callback indicating that the
+ * allocation was successful. So slices[0] should always exist here */
+ char* buffer = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[0]);
+ size_t len = GRPC_SLICE_LENGTH(tcp->read_slices->slices[0]);
+ grpc_custom_socket_vtable->read(tcp->socket, buffer, len,
+ custom_read_callback);
+ } else {
+ grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
+ call_read_cb(tcp, GRPC_ERROR_REF(error));
+ }
+ if (grpc_tcp_trace.enabled()) {
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp->socket, str);
+ }
+}
+
+static void endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
+ grpc_closure* cb) {
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ GPR_ASSERT(tcp->read_cb == nullptr);
+ tcp->read_cb = cb;
+ tcp->read_slices = read_slices;
+ grpc_slice_buffer_reset_and_unref_internal(read_slices);
+ TCP_REF(tcp, "read");
+ grpc_resource_user_alloc_slices(&tcp->slice_allocator,
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
+ tcp->read_slices);
+}
+
+static void custom_write_callback(grpc_custom_socket* socket,
+ grpc_error* error) {
+ grpc_core::ExecCtx exec_ctx;
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)socket->endpoint;
+ grpc_closure* cb = tcp->write_cb;
+ tcp->write_cb = nullptr;
+ if (grpc_tcp_trace.enabled()) {
+ const char* str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp->socket, str);
+ }
+ TCP_UNREF(tcp, "write");
+ GRPC_CLOSURE_SCHED(cb, error);
+}
+
+static void endpoint_write(grpc_endpoint* ep, grpc_slice_buffer* write_slices,
+ grpc_closure* cb) {
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+
+ if (grpc_tcp_trace.enabled()) {
+ size_t j;
+
+ for (j = 0; j < write_slices->count; j++) {
+ char* data = grpc_dump_slice(write_slices->slices[j],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp->socket,
+ tcp->peer_string, data);
+ gpr_free(data);
+ }
+ }
+
+ if (tcp->shutting_down) {
+ GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "TCP socket is shutting down"));
+ return;
+ }
+
+ GPR_ASSERT(tcp->write_cb == nullptr);
+ tcp->write_slices = write_slices;
+ GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
+ if (tcp->write_slices->count == 0) {
+ // No slices means we don't have to do anything,
+ // and libuv doesn't like empty writes
+ GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
+ return;
+ }
+ tcp->write_cb = cb;
+ TCP_REF(tcp, "write");
+ grpc_custom_socket_vtable->write(tcp->socket, tcp->write_slices,
+ custom_write_callback);
+}
+
+static void endpoint_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
+ // No-op. We're ignoring pollsets currently
+ (void)ep;
+ (void)pollset;
+}
+
+static void endpoint_add_to_pollset_set(grpc_endpoint* ep,
+ grpc_pollset_set* pollset) {
+ // No-op. We're ignoring pollsets currently
+ (void)ep;
+ (void)pollset;
+}
+
+static void endpoint_delete_from_pollset_set(grpc_endpoint* ep,
+ grpc_pollset_set* pollset) {
+ // No-op. We're ignoring pollsets currently
+ (void)ep;
+ (void)pollset;
+}
+
+static void endpoint_shutdown(grpc_endpoint* ep, grpc_error* why) {
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
+ if (!tcp->shutting_down) {
+ if (grpc_tcp_trace.enabled()) {
+ const char* str = grpc_error_string(why);
+ gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->socket, str);
+ }
+ tcp->shutting_down = true;
+ // GRPC_CLOSURE_SCHED(tcp->read_cb, GRPC_ERROR_REF(why));
+ // GRPC_CLOSURE_SCHED(tcp->write_cb, GRPC_ERROR_REF(why));
+ // tcp->read_cb = nullptr;
+ // tcp->write_cb = nullptr;
+ grpc_resource_user_shutdown(tcp->resource_user);
+ grpc_custom_socket_vtable->shutdown(tcp->socket);
+ }
+ GRPC_ERROR_UNREF(why);
+}
+
+static void custom_close_callback(grpc_custom_socket* socket) {
+ socket->refs--;
+ if (socket->refs == 0) {
+ grpc_custom_socket_vtable->destroy(socket);
+ gpr_free(socket);
+ } else if (socket->endpoint) {
+ grpc_core::ExecCtx exec_ctx;
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)socket->endpoint;
+ TCP_UNREF(tcp, "destroy");
+ }
+}
+
+static void endpoint_destroy(grpc_endpoint* ep) {
+ grpc_network_status_unregister_endpoint(ep);
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
+ grpc_custom_socket_vtable->close(tcp->socket, custom_close_callback);
+}
+
+static char* endpoint_get_peer(grpc_endpoint* ep) {
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
+ return gpr_strdup(tcp->peer_string);
+}
+
+static grpc_resource_user* endpoint_get_resource_user(grpc_endpoint* ep) {
+ custom_tcp_endpoint* tcp = (custom_tcp_endpoint*)ep;
+ return tcp->resource_user;
+}
+
+static int endpoint_get_fd(grpc_endpoint* ep) { return -1; }
+
+static grpc_endpoint_vtable vtable = {endpoint_read,
+ endpoint_write,
+ endpoint_add_to_pollset,
+ endpoint_add_to_pollset_set,
+ endpoint_delete_from_pollset_set,
+ endpoint_shutdown,
+ endpoint_destroy,
+ endpoint_get_resource_user,
+ endpoint_get_peer,
+ endpoint_get_fd};
+
+grpc_endpoint* custom_tcp_endpoint_create(grpc_custom_socket* socket,
+ grpc_resource_quota* resource_quota,
+ char* peer_string) {
+ custom_tcp_endpoint* tcp =
+ (custom_tcp_endpoint*)gpr_malloc(sizeof(custom_tcp_endpoint));
+ grpc_core::ExecCtx exec_ctx;
+
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", socket);
+ }
+ memset(tcp, 0, sizeof(custom_tcp_endpoint));
+ socket->refs++;
+ socket->endpoint = (grpc_endpoint*)tcp;
+ tcp->socket = socket;
+ tcp->base.vtable = &vtable;
+ gpr_ref_init(&tcp->refcount, 1);
+ tcp->peer_string = gpr_strdup(peer_string);
+ tcp->shutting_down = false;
+ tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
+ grpc_resource_user_slice_allocator_init(
+ &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
+ /* Tell network status tracking code about the new endpoint */
+ grpc_network_status_register_endpoint(&tcp->base);
+
+ return &tcp->base;
+}
diff --git a/src/core/lib/iomgr/tcp_custom.h b/src/core/lib/iomgr/tcp_custom.h
new file mode 100644
index 0000000000..22caa149f8
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_custom.h
@@ -0,0 +1,83 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TCP_CUSTOM_H
+#define GRPC_CORE_LIB_IOMGR_TCP_CUSTOM_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+
+typedef struct grpc_tcp_listener grpc_tcp_listener;
+typedef struct grpc_custom_tcp_connect grpc_custom_tcp_connect;
+
+typedef struct grpc_custom_socket {
+ // Implementation defined
+ void* impl;
+ grpc_endpoint* endpoint;
+ grpc_tcp_listener* listener;
+ grpc_custom_tcp_connect* connector;
+ int refs;
+} grpc_custom_socket;
+
+typedef void (*grpc_custom_connect_callback)(grpc_custom_socket* socket,
+ grpc_error* error);
+typedef void (*grpc_custom_write_callback)(grpc_custom_socket* socket,
+ grpc_error* error);
+typedef void (*grpc_custom_read_callback)(grpc_custom_socket* socket,
+ size_t nread, grpc_error* error);
+typedef void (*grpc_custom_accept_callback)(grpc_custom_socket* socket,
+ grpc_custom_socket* client,
+ grpc_error* error);
+typedef void (*grpc_custom_close_callback)(grpc_custom_socket* socket);
+
+typedef struct grpc_socket_vtable {
+ grpc_error* (*init)(grpc_custom_socket* socket, int domain);
+ void (*connect)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
+ size_t len, grpc_custom_connect_callback cb);
+ void (*destroy)(grpc_custom_socket* socket);
+ void (*shutdown)(grpc_custom_socket* socket);
+ void (*close)(grpc_custom_socket* socket, grpc_custom_close_callback cb);
+ void (*write)(grpc_custom_socket* socket, grpc_slice_buffer* slices,
+ grpc_custom_write_callback cb);
+ void (*read)(grpc_custom_socket* socket, char* buffer, size_t length,
+ grpc_custom_read_callback cb);
+ grpc_error* (*getpeername)(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr, int* len);
+ grpc_error* (*getsockname)(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr, int* len);
+ grpc_error* (*setsockopt)(grpc_custom_socket* socket, int level, int optname,
+ const void* optval, uint32_t optlen);
+ grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr,
+ size_t len, int flags);
+ grpc_error* (*listen)(grpc_custom_socket* socket);
+ void (*accept)(grpc_custom_socket* socket, grpc_custom_socket* client,
+ grpc_custom_accept_callback cb);
+} grpc_socket_vtable;
+
+/* Internal APIs */
+void grpc_custom_endpoint_init(grpc_socket_vtable* impl);
+
+void grpc_custom_close_server_callback(grpc_tcp_listener* listener);
+
+grpc_endpoint* custom_tcp_endpoint_create(grpc_custom_socket* socket,
+ grpc_resource_quota* resource_quota,
+ char* peer_string);
+
+#endif /* GRPC_CORE_LIB_IOMGR_TCP_CUSTOM_H */
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index ca0046b83b..205af22531 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -63,7 +63,7 @@ typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type;
typedef size_t msg_iovlen_type;
#endif
-grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
+extern grpc_core::TraceFlag grpc_tcp_trace;
namespace {
struct grpc_tcp {
diff --git a/src/core/lib/iomgr/tcp_server.cc b/src/core/lib/iomgr/tcp_server.cc
new file mode 100644
index 0000000000..ea745f266b
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server.cc
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/tcp_server.h"
+
+grpc_tcp_server_vtable* grpc_tcp_server_impl;
+
+grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete,
+ const grpc_channel_args* args,
+ grpc_tcp_server** server) {
+ return grpc_tcp_server_impl->create(shutdown_complete, args, server);
+}
+
+void grpc_tcp_server_start(grpc_tcp_server* server, grpc_pollset** pollsets,
+ size_t pollset_count,
+ grpc_tcp_server_cb on_accept_cb, void* cb_arg) {
+ grpc_tcp_server_impl->start(server, pollsets, pollset_count, on_accept_cb,
+ cb_arg);
+}
+
+grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
+ const grpc_resolved_address* addr,
+ int* out_port) {
+ return grpc_tcp_server_impl->add_port(s, addr, out_port);
+}
+
+unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server* s,
+ unsigned port_index) {
+ return grpc_tcp_server_impl->port_fd_count(s, port_index);
+}
+
+int grpc_tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
+ unsigned fd_index) {
+ return grpc_tcp_server_impl->port_fd(s, port_index, fd_index);
+}
+
+grpc_tcp_server* grpc_tcp_server_ref(grpc_tcp_server* s) {
+ return grpc_tcp_server_impl->ref(s);
+}
+
+void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server* s,
+ grpc_closure* shutdown_starting) {
+ grpc_tcp_server_impl->shutdown_starting_add(s, shutdown_starting);
+}
+
+void grpc_tcp_server_unref(grpc_tcp_server* s) {
+ grpc_tcp_server_impl->unref(s);
+}
+
+void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s) {
+ grpc_tcp_server_impl->shutdown_listeners(s);
+}
+
+void grpc_set_tcp_server_impl(grpc_tcp_server_vtable* impl) {
+ grpc_tcp_server_impl = impl;
+}
diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h
index 965d97407f..8fcbb2f680 100644
--- a/src/core/lib/iomgr/tcp_server.h
+++ b/src/core/lib/iomgr/tcp_server.h
@@ -45,6 +45,24 @@ typedef void (*grpc_tcp_server_cb)(void* arg, grpc_endpoint* ep,
grpc_pollset* accepting_pollset,
grpc_tcp_server_acceptor* acceptor);
+typedef struct grpc_tcp_server_vtable {
+ grpc_error* (*create)(grpc_closure* shutdown_complete,
+ const grpc_channel_args* args,
+ grpc_tcp_server** server);
+ void (*start)(grpc_tcp_server* server, grpc_pollset** pollsets,
+ size_t pollset_count, grpc_tcp_server_cb on_accept_cb,
+ void* cb_arg);
+ grpc_error* (*add_port)(grpc_tcp_server* s, const grpc_resolved_address* addr,
+ int* out_port);
+ unsigned (*port_fd_count)(grpc_tcp_server* s, unsigned port_index);
+ int (*port_fd)(grpc_tcp_server* s, unsigned port_index, unsigned fd_index);
+ grpc_tcp_server* (*ref)(grpc_tcp_server* s);
+ void (*shutdown_starting_add)(grpc_tcp_server* s,
+ grpc_closure* shutdown_starting);
+ void (*unref)(grpc_tcp_server* s);
+ void (*shutdown_listeners)(grpc_tcp_server* s);
+} grpc_tcp_server_vtable;
+
/* Create a server, initially not bound to any ports. The caller owns one ref.
If shutdown_complete is not NULL, it will be used by
grpc_tcp_server_unref() when the ref count reaches zero. */
@@ -97,4 +115,8 @@ void grpc_tcp_server_unref(grpc_tcp_server* s);
/* Shutdown the fds of listeners. */
void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s);
+void grpc_tcp_server_global_init();
+
+void grpc_set_tcp_server_impl(grpc_tcp_server_vtable* impl);
+
#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_H */
diff --git a/src/core/lib/iomgr/tcp_server_custom.cc b/src/core/lib/iomgr/tcp_server_custom.cc
new file mode 100644
index 0000000000..be92e61b62
--- /dev/null
+++ b/src/core/lib/iomgr/tcp_server_custom.cc
@@ -0,0 +1,479 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <assert.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/iomgr_custom.h"
+#include "src/core/lib/iomgr/sockaddr.h"
+#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/tcp_custom.h"
+#include "src/core/lib/iomgr/tcp_server.h"
+
+extern grpc_core::TraceFlag grpc_tcp_trace;
+
+extern grpc_socket_vtable* grpc_custom_socket_vtable;
+
+/* one listening port */
+struct grpc_tcp_listener {
+ grpc_tcp_server* server;
+ unsigned port_index;
+ int port;
+
+ grpc_custom_socket* socket;
+
+ /* linked list */
+ struct grpc_tcp_listener* next;
+
+ bool closed;
+};
+
+struct grpc_tcp_server {
+ gpr_refcount refs;
+
+ /* Called whenever accept() succeeds on a server port. */
+ grpc_tcp_server_cb on_accept_cb;
+ void* on_accept_cb_arg;
+
+ int open_ports;
+
+ /* linked list of server ports */
+ grpc_tcp_listener* head;
+ grpc_tcp_listener* tail;
+
+ /* List of closures passed to shutdown_starting_add(). */
+ grpc_closure_list shutdown_starting;
+
+ /* shutdown callback */
+ grpc_closure* shutdown_complete;
+
+ bool shutdown;
+
+ grpc_resource_quota* resource_quota;
+};
+
+static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
+ const grpc_channel_args* args,
+ grpc_tcp_server** server) {
+ grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
+ s->resource_quota = grpc_resource_quota_create(nullptr);
+ for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) {
+ if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
+ if (args->args[i].type == GRPC_ARG_POINTER) {
+ grpc_resource_quota_unref_internal(s->resource_quota);
+ s->resource_quota = grpc_resource_quota_ref_internal(
+ (grpc_resource_quota*)args->args[i].value.pointer.p);
+ } else {
+ grpc_resource_quota_unref_internal(s->resource_quota);
+ gpr_free(s);
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool");
+ }
+ }
+ }
+ gpr_ref_init(&s->refs, 1);
+ s->on_accept_cb = nullptr;
+ s->on_accept_cb_arg = nullptr;
+ s->open_ports = 0;
+ s->head = nullptr;
+ s->tail = nullptr;
+ s->shutdown_starting.head = nullptr;
+ s->shutdown_starting.tail = nullptr;
+ s->shutdown_complete = shutdown_complete;
+ s->shutdown = false;
+ *server = s;
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ gpr_ref(&s->refs);
+ return s;
+}
+
+static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
+ grpc_closure* shutdown_starting) {
+ grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
+ GRPC_ERROR_NONE);
+}
+
+static void finish_shutdown(grpc_tcp_server* s) {
+ GPR_ASSERT(s->shutdown);
+ if (s->shutdown_complete != nullptr) {
+ GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
+ }
+
+ while (s->head) {
+ grpc_tcp_listener* sp = s->head;
+ s->head = sp->next;
+ sp->next = nullptr;
+ gpr_free(sp);
+ }
+ grpc_resource_quota_unref_internal(s->resource_quota);
+ gpr_free(s);
+}
+
+static void custom_close_callback(grpc_custom_socket* socket) {
+ grpc_tcp_listener* sp = socket->listener;
+ if (sp) {
+ grpc_core::ExecCtx exec_ctx;
+ sp->server->open_ports--;
+ if (sp->server->open_ports == 0 && sp->server->shutdown) {
+ finish_shutdown(sp->server);
+ }
+ }
+ socket->refs--;
+ if (socket->refs == 0) {
+ grpc_custom_socket_vtable->destroy(socket);
+ gpr_free(socket);
+ }
+}
+
+void grpc_custom_close_server_callback(grpc_tcp_listener* sp) {
+ if (sp) {
+ grpc_core::ExecCtx exec_ctx;
+ sp->server->open_ports--;
+ if (sp->server->open_ports == 0 && sp->server->shutdown) {
+ finish_shutdown(sp->server);
+ }
+ }
+}
+
+static void close_listener(grpc_tcp_listener* sp) {
+ grpc_custom_socket* socket = sp->socket;
+ if (!sp->closed) {
+ sp->closed = true;
+ grpc_custom_socket_vtable->close(socket, custom_close_callback);
+ }
+}
+
+static void tcp_server_destroy(grpc_tcp_server* s) {
+ int immediately_done = 0;
+ grpc_tcp_listener* sp;
+
+ GPR_ASSERT(!s->shutdown);
+ s->shutdown = true;
+
+ if (s->open_ports == 0) {
+ immediately_done = 1;
+ }
+ for (sp = s->head; sp; sp = sp->next) {
+ close_listener(sp);
+ }
+
+ if (immediately_done) {
+ finish_shutdown(s);
+ }
+}
+
+static void tcp_server_unref(grpc_tcp_server* s) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ if (gpr_unref(&s->refs)) {
+ /* Complete shutdown_starting work before destroying. */
+ grpc_core::ExecCtx exec_ctx;
+ GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
+ grpc_core::ExecCtx::Get()->Flush();
+ tcp_server_destroy(s);
+ }
+}
+
+static void finish_accept(grpc_tcp_listener* sp, grpc_custom_socket* socket) {
+ grpc_tcp_server_acceptor* acceptor =
+ (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
+ grpc_endpoint* ep = nullptr;
+ grpc_resolved_address peer_name;
+ char* peer_name_string;
+ grpc_error* err;
+
+ peer_name_string = nullptr;
+ memset(&peer_name, 0, sizeof(grpc_resolved_address));
+ peer_name.len = GRPC_MAX_SOCKADDR_SIZE;
+ err = grpc_custom_socket_vtable->getpeername(
+ socket, (grpc_sockaddr*)&peer_name.addr, (int*)&peer_name.len);
+ if (err == GRPC_ERROR_NONE) {
+ peer_name_string = grpc_sockaddr_to_uri(&peer_name);
+ } else {
+ GRPC_LOG_IF_ERROR("getpeername error", err);
+ GRPC_ERROR_UNREF(err);
+ }
+ if (grpc_tcp_trace.enabled()) {
+ if (peer_name_string) {
+ gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s",
+ sp->server, peer_name_string);
+ } else {
+ gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection", sp->server);
+ }
+ }
+ ep = custom_tcp_endpoint_create(socket, sp->server->resource_quota,
+ peer_name_string);
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = 0;
+ sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, nullptr, acceptor);
+ gpr_free(peer_name_string);
+}
+
+static void custom_accept_callback(grpc_custom_socket* socket,
+ grpc_custom_socket* client,
+ grpc_error* error);
+
+static void custom_accept_callback(grpc_custom_socket* socket,
+ grpc_custom_socket* client,
+ grpc_error* error) {
+ grpc_core::ExecCtx exec_ctx;
+ grpc_tcp_listener* sp = socket->listener;
+ if (error != GRPC_ERROR_NONE) {
+ if (!sp->closed) {
+ gpr_log(GPR_ERROR, "Accept failed: %s", grpc_error_string(error));
+ }
+ gpr_free(client);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ finish_accept(sp, client);
+ if (!sp->closed) {
+ grpc_custom_socket* new_socket =
+ (grpc_custom_socket*)gpr_malloc(sizeof(grpc_custom_socket));
+ new_socket->endpoint = nullptr;
+ new_socket->listener = nullptr;
+ new_socket->connector = nullptr;
+ new_socket->refs = 1;
+ grpc_custom_socket_vtable->accept(sp->socket, new_socket,
+ custom_accept_callback);
+ }
+}
+
+static grpc_error* add_socket_to_server(grpc_tcp_server* s,
+ grpc_custom_socket* socket,
+ const grpc_resolved_address* addr,
+ unsigned port_index,
+ grpc_tcp_listener** listener) {
+ grpc_tcp_listener* sp = nullptr;
+ int port = -1;
+ grpc_error* error;
+ grpc_resolved_address sockname_temp;
+
+ // The last argument to uv_tcp_bind is flags
+ error = grpc_custom_socket_vtable->bind(socket, (grpc_sockaddr*)addr->addr,
+ addr->len, 0);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
+ }
+
+ error = grpc_custom_socket_vtable->listen(socket);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
+ }
+
+ sockname_temp.len = GRPC_MAX_SOCKADDR_SIZE;
+ error = grpc_custom_socket_vtable->getsockname(
+ socket, (grpc_sockaddr*)&sockname_temp.addr, (int*)&sockname_temp.len);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
+ }
+
+ port = grpc_sockaddr_get_port(&sockname_temp);
+
+ GPR_ASSERT(port >= 0);
+ GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
+ sp = (grpc_tcp_listener*)gpr_zalloc(sizeof(grpc_tcp_listener));
+ sp->next = nullptr;
+ if (s->head == nullptr) {
+ s->head = sp;
+ } else {
+ s->tail->next = sp;
+ }
+ s->tail = sp;
+ sp->server = s;
+ sp->socket = socket;
+ sp->port = port;
+ sp->port_index = port_index;
+ sp->closed = false;
+ s->open_ports++;
+ *listener = sp;
+
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
+ const grpc_resolved_address* addr,
+ int* port) {
+ // This function is mostly copied from tcp_server_windows.c
+ grpc_tcp_listener* sp = nullptr;
+ grpc_custom_socket* socket;
+ grpc_resolved_address addr6_v4mapped;
+ grpc_resolved_address wildcard;
+ grpc_resolved_address* allocated_addr = nullptr;
+ grpc_resolved_address sockname_temp;
+ unsigned port_index = 0;
+ grpc_error* error = GRPC_ERROR_NONE;
+ int family;
+
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+
+ if (s->tail != nullptr) {
+ port_index = s->tail->port_index + 1;
+ }
+
+ /* Check if this is a wildcard port, and if so, try to keep the port the same
+ as some previously created listener. */
+ if (grpc_sockaddr_get_port(addr) == 0) {
+ for (sp = s->head; sp; sp = sp->next) {
+ socket = sp->socket;
+ sockname_temp.len = GRPC_MAX_SOCKADDR_SIZE;
+ if (nullptr == grpc_custom_socket_vtable->getsockname(
+ socket, (grpc_sockaddr*)&sockname_temp.addr,
+ (int*)&sockname_temp.len)) {
+ *port = grpc_sockaddr_get_port(&sockname_temp);
+ if (*port > 0) {
+ allocated_addr =
+ (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
+ memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
+ grpc_sockaddr_set_port(allocated_addr, *port);
+ addr = allocated_addr;
+ break;
+ }
+ }
+ }
+ }
+
+ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
+ addr = &addr6_v4mapped;
+ }
+
+ /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
+ if (grpc_sockaddr_is_wildcard(addr, port)) {
+ grpc_sockaddr_make_wildcard6(*port, &wildcard);
+
+ addr = &wildcard;
+ }
+
+ if (grpc_tcp_trace.enabled()) {
+ char* port_string;
+ grpc_sockaddr_to_string(&port_string, addr, 0);
+ const char* str = grpc_error_string(error);
+ if (port_string) {
+ gpr_log(GPR_DEBUG, "SERVER %p add_port %s error=%s", s, port_string, str);
+ gpr_free(port_string);
+ } else {
+ gpr_log(GPR_DEBUG, "SERVER %p add_port error=%s", s, str);
+ }
+ }
+
+ family = grpc_sockaddr_get_family(addr);
+ socket = (grpc_custom_socket*)gpr_malloc(sizeof(grpc_custom_socket));
+ socket->refs = 1;
+ socket->endpoint = nullptr;
+ socket->listener = nullptr;
+ socket->connector = nullptr;
+ grpc_custom_socket_vtable->init(socket, family);
+
+ if (error == GRPC_ERROR_NONE) {
+#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
+ if (family == AF_INET || family == AF_INET6) {
+ int enable = 1;
+ grpc_custom_socket_vtable->setsockopt(socket, SOL_SOCKET, SO_REUSEPORT,
+ &enable, sizeof(enable));
+ }
+#endif /* GPR_LINUX && SO_REUSEPORT */
+ error = add_socket_to_server(s, socket, addr, port_index, &sp);
+ }
+ gpr_free(allocated_addr);
+
+ if (error != GRPC_ERROR_NONE) {
+ grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
+ "Failed to add port to server", &error, 1);
+ GRPC_ERROR_UNREF(error);
+ error = error_out;
+ *port = -1;
+ } else {
+ GPR_ASSERT(sp != nullptr);
+ *port = sp->port;
+ }
+ socket->listener = sp;
+ return error;
+}
+
+static void tcp_server_start(grpc_tcp_server* server, grpc_pollset** pollsets,
+ size_t pollset_count,
+ grpc_tcp_server_cb on_accept_cb, void* cb_arg) {
+ grpc_tcp_listener* sp;
+ (void)pollsets;
+ (void)pollset_count;
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_DEBUG, "SERVER_START %p", server);
+ }
+ GPR_ASSERT(on_accept_cb);
+ GPR_ASSERT(!server->on_accept_cb);
+ server->on_accept_cb = on_accept_cb;
+ server->on_accept_cb_arg = cb_arg;
+ for (sp = server->head; sp; sp = sp->next) {
+ grpc_custom_socket* new_socket =
+ (grpc_custom_socket*)gpr_malloc(sizeof(grpc_custom_socket));
+ new_socket->endpoint = nullptr;
+ new_socket->listener = nullptr;
+ new_socket->connector = nullptr;
+ new_socket->refs = 1;
+ grpc_custom_socket_vtable->accept(sp->socket, new_socket,
+ custom_accept_callback);
+ }
+}
+
+static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
+ unsigned port_index) {
+ return 0;
+}
+
+static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
+ unsigned fd_index) {
+ return -1;
+}
+
+static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
+ for (grpc_tcp_listener* sp = s->head; sp; sp = sp->next) {
+ if (!sp->closed) {
+ sp->closed = true;
+ grpc_custom_socket_vtable->close(sp->socket, custom_close_callback);
+ }
+ }
+}
+
+grpc_tcp_server_vtable custom_tcp_server_vtable = {
+ tcp_server_create,
+ tcp_server_start,
+ tcp_server_add_port,
+ tcp_server_port_fd_count,
+ tcp_server_port_fd,
+ tcp_server_ref,
+ tcp_server_shutdown_starting_add,
+ tcp_server_unref,
+ tcp_server_shutdown_listeners};
+
+#ifdef GRPC_UV_TEST
+grpc_tcp_server_vtable* default_tcp_server_vtable = &custom_tcp_server_vtable;
+#endif
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index a609c09ea7..aba5d6cdb0 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -69,9 +69,9 @@ static void init(void) {
#endif
}
-grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete,
- const grpc_channel_args* args,
- grpc_tcp_server** server) {
+static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
+ const grpc_channel_args* args,
+ grpc_tcp_server** server) {
gpr_once_init(&check_init, init);
grpc_tcp_server* s =
@@ -392,9 +392,9 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
return GRPC_ERROR_NONE;
}
-grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
- const grpc_resolved_address* addr,
- int* out_port) {
+static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
+ const grpc_resolved_address* addr,
+ int* out_port) {
grpc_tcp_listener* sp;
grpc_resolved_address sockname_temp;
grpc_resolved_address addr6_v4mapped;
@@ -415,7 +415,7 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
sockname_temp.len = sizeof(struct sockaddr_storage);
if (0 ==
getsockname(sp->fd,
- reinterpret_cast<struct sockaddr*>(&sockname_temp.addr),
+ reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
reinterpret_cast<socklen_t*>(&sockname_temp.len))) {
int used_port = grpc_sockaddr_get_port(&sockname_temp);
if (used_port > 0) {
@@ -458,8 +458,7 @@ static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
return nullptr;
}
-unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server* s,
- unsigned port_index) {
+unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
unsigned num_fds = 0;
gpr_mu_lock(&s->mu);
grpc_tcp_listener* sp = get_port_index(s, port_index);
@@ -470,8 +469,8 @@ unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server* s,
return num_fds;
}
-int grpc_tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
- unsigned fd_index) {
+static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
+ unsigned fd_index) {
gpr_mu_lock(&s->mu);
grpc_tcp_listener* sp = get_port_index(s, port_index);
for (; sp; sp = sp->sibling, --fd_index) {
@@ -484,10 +483,10 @@ int grpc_tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
return -1;
}
-void grpc_tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
- size_t pollset_count,
- grpc_tcp_server_cb on_accept_cb,
- void* on_accept_cb_arg) {
+static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
+ size_t pollset_count,
+ grpc_tcp_server_cb on_accept_cb,
+ void* on_accept_cb_arg) {
size_t i;
grpc_tcp_listener* sp;
GPR_ASSERT(on_accept_cb);
@@ -526,20 +525,20 @@ void grpc_tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
gpr_mu_unlock(&s->mu);
}
-grpc_tcp_server* grpc_tcp_server_ref(grpc_tcp_server* s) {
+grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
gpr_ref_non_zero(&s->refs);
return s;
}
-void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server* s,
- grpc_closure* shutdown_starting) {
+static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
+ grpc_closure* shutdown_starting) {
gpr_mu_lock(&s->mu);
grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
GRPC_ERROR_NONE);
gpr_mu_unlock(&s->mu);
}
-void grpc_tcp_server_unref(grpc_tcp_server* s) {
+static void tcp_server_unref(grpc_tcp_server* s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(s);
gpr_mu_lock(&s->mu);
@@ -549,7 +548,7 @@ void grpc_tcp_server_unref(grpc_tcp_server* s) {
}
}
-void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s) {
+static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
gpr_mu_lock(&s->mu);
s->shutdown_listeners = true;
/* shutdown all fd's */
@@ -563,4 +562,14 @@ void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s) {
gpr_mu_unlock(&s->mu);
}
+grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
+ tcp_server_create,
+ tcp_server_start,
+ tcp_server_add_port,
+ tcp_server_port_fd_count,
+ tcp_server_port_fd,
+ tcp_server_ref,
+ tcp_server_shutdown_starting_add,
+ tcp_server_unref,
+ tcp_server_shutdown_listeners};
#endif
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
index 846f9cccb7..76d3d62940 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc
@@ -171,8 +171,7 @@ grpc_error* grpc_tcp_server_prepare_socket(int fd,
if (err != GRPC_ERROR_NONE) goto error;
GPR_ASSERT(addr->len < ~(socklen_t)0);
- if (bind(fd,
- reinterpret_cast<struct sockaddr*>(const_cast<char*>(addr->addr)),
+ if (bind(fd, reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)),
static_cast<socklen_t>(addr->len)) < 0) {
err = GRPC_OS_ERROR(errno, "bind");
goto error;
@@ -185,7 +184,7 @@ grpc_error* grpc_tcp_server_prepare_socket(int fd,
sockname_temp.len = sizeof(struct sockaddr_storage);
- if (getsockname(fd, reinterpret_cast<struct sockaddr*>(sockname_temp.addr),
+ if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
reinterpret_cast<socklen_t*>(&sockname_temp.len)) < 0) {
err = GRPC_OS_ERROR(errno, "getsockname");
goto error;
diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc
index 308ff0f8a6..29ff9ecda1 100644
--- a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc
+++ b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc
@@ -68,13 +68,13 @@ static grpc_error* get_unused_port(int* port) {
if (dsmode == GRPC_DSMODE_IPV4) {
grpc_sockaddr_make_wildcard4(0, &wild);
}
- if (bind(fd, reinterpret_cast<const struct sockaddr*>(wild.addr),
+ if (bind(fd, reinterpret_cast<const grpc_sockaddr*>(wild.addr),
static_cast<socklen_t>(wild.len)) != 0) {
err = GRPC_OS_ERROR(errno, "bind");
close(fd);
return err;
}
- if (getsockname(fd, reinterpret_cast<struct sockaddr*>(wild.addr),
+ if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(wild.addr),
reinterpret_cast<socklen_t*>(&wild.len)) != 0) {
err = GRPC_OS_ERROR(errno, "getsockname");
close(fd);
@@ -119,9 +119,9 @@ grpc_error* grpc_tcp_server_add_all_local_addrs(grpc_tcp_server* s,
if (ifa_it->ifa_addr == nullptr) {
continue;
} else if (ifa_it->ifa_addr->sa_family == AF_INET) {
- addr.len = sizeof(struct sockaddr_in);
+ addr.len = sizeof(grpc_sockaddr_in);
} else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
- addr.len = sizeof(struct sockaddr_in6);
+ addr.len = sizeof(grpc_sockaddr_in6);
} else {
continue;
}
diff --git a/src/core/lib/iomgr/tcp_server_uv.cc b/src/core/lib/iomgr/tcp_server_uv.cc
deleted file mode 100644
index aa423766c7..0000000000
--- a/src/core/lib/iomgr/tcp_server_uv.cc
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GRPC_UV
-
-#include <assert.h>
-#include <string.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-
-#include "src/core/lib/iomgr/error.h"
-#include "src/core/lib/iomgr/exec_ctx.h"
-#include "src/core/lib/iomgr/iomgr_uv.h"
-#include "src/core/lib/iomgr/sockaddr.h"
-#include "src/core/lib/iomgr/sockaddr_utils.h"
-#include "src/core/lib/iomgr/tcp_server.h"
-#include "src/core/lib/iomgr/tcp_uv.h"
-
-/* one listening port */
-typedef struct grpc_tcp_listener grpc_tcp_listener;
-struct grpc_tcp_listener {
- uv_tcp_t* handle;
- grpc_tcp_server* server;
- unsigned port_index;
- int port;
- /* linked list */
- struct grpc_tcp_listener* next;
-
- bool closed;
-
- bool has_pending_connection;
-};
-
-struct grpc_tcp_server {
- gpr_refcount refs;
-
- /* Called whenever accept() succeeds on a server port. */
- grpc_tcp_server_cb on_accept_cb;
- void* on_accept_cb_arg;
-
- int open_ports;
-
- /* linked list of server ports */
- grpc_tcp_listener* head;
- grpc_tcp_listener* tail;
-
- /* List of closures passed to shutdown_starting_add(). */
- grpc_closure_list shutdown_starting;
-
- /* shutdown callback */
- grpc_closure* shutdown_complete;
-
- bool shutdown;
-
- grpc_resource_quota* resource_quota;
-};
-
-grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete,
- const grpc_channel_args* args,
- grpc_tcp_server** server) {
- grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
- s->resource_quota = grpc_resource_quota_create(NULL);
- for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
- if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
- if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_unref_internal(s->resource_quota);
- s->resource_quota = grpc_resource_quota_ref_internal(
- (grpc_resource_quota*)args->args[i].value.pointer.p);
- } else {
- grpc_resource_quota_unref_internal(s->resource_quota);
- gpr_free(s);
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool");
- }
- }
- }
- gpr_ref_init(&s->refs, 1);
- s->on_accept_cb = NULL;
- s->on_accept_cb_arg = NULL;
- s->open_ports = 0;
- s->head = NULL;
- s->tail = NULL;
- s->shutdown_starting.head = NULL;
- s->shutdown_starting.tail = NULL;
- s->shutdown_complete = shutdown_complete;
- s->shutdown = false;
- *server = s;
- return GRPC_ERROR_NONE;
-}
-
-grpc_tcp_server* grpc_tcp_server_ref(grpc_tcp_server* s) {
- GRPC_UV_ASSERT_SAME_THREAD();
- gpr_ref(&s->refs);
- return s;
-}
-
-void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server* s,
- grpc_closure* shutdown_starting) {
- grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
- GRPC_ERROR_NONE);
-}
-
-static void finish_shutdown(grpc_tcp_server* s) {
- GPR_ASSERT(s->shutdown);
- if (s->shutdown_complete != NULL) {
- GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
- }
-
- while (s->head) {
- grpc_tcp_listener* sp = s->head;
- s->head = sp->next;
- sp->next = NULL;
- gpr_free(sp->handle);
- gpr_free(sp);
- }
- grpc_resource_quota_unref_internal(s->resource_quota);
- gpr_free(s);
-}
-
-static void handle_close_callback(uv_handle_t* handle) {
- grpc_tcp_listener* sp = (grpc_tcp_listener*)handle->data;
- grpc_core::ExecCtx exec_ctx;
- sp->server->open_ports--;
- if (sp->server->open_ports == 0 && sp->server->shutdown) {
- finish_shutdown(sp->server);
- }
-}
-
-static void close_listener(grpc_tcp_listener* sp) {
- if (!sp->closed) {
- sp->closed = true;
- uv_close((uv_handle_t*)sp->handle, handle_close_callback);
- }
-}
-
-static void tcp_server_destroy(grpc_tcp_server* s) {
- int immediately_done = 0;
- grpc_tcp_listener* sp;
-
- GPR_ASSERT(!s->shutdown);
- s->shutdown = true;
-
- if (s->open_ports == 0) {
- immediately_done = 1;
- }
- for (sp = s->head; sp; sp = sp->next) {
- close_listener(sp);
- }
-
- if (immediately_done) {
- finish_shutdown(s);
- }
-}
-
-void grpc_tcp_server_unref(grpc_tcp_server* s) {
- GRPC_UV_ASSERT_SAME_THREAD();
- if (gpr_unref(&s->refs)) {
- /* Complete shutdown_starting work before destroying. */
- grpc_core::ExecCtx exec_ctx;
- GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
- grpc_core::ExecCtx::Get()->Flush();
- tcp_server_destroy(s);
- }
-}
-
-static void finish_accept(grpc_tcp_listener* sp) {
- grpc_tcp_server_acceptor* acceptor =
- (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
- uv_tcp_t* client = NULL;
- grpc_endpoint* ep = NULL;
- grpc_resolved_address peer_name;
- char* peer_name_string;
- int err;
- uv_tcp_t* server = sp->handle;
-
- client = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
- uv_tcp_init(uv_default_loop(), client);
- // UV documentation says this is guaranteed to succeed
- uv_accept((uv_stream_t*)server, (uv_stream_t*)client);
- peer_name_string = NULL;
- memset(&peer_name, 0, sizeof(grpc_resolved_address));
- peer_name.len = sizeof(struct sockaddr_storage);
- err = uv_tcp_getpeername(client, (struct sockaddr*)&peer_name.addr,
- (int*)&peer_name.len);
- if (err == 0) {
- peer_name_string = grpc_sockaddr_to_uri(&peer_name);
- } else {
- gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(err));
- }
- if (grpc_tcp_trace.enabled()) {
- if (peer_name_string) {
- gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s",
- sp->server, peer_name_string);
- } else {
- gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection", sp->server);
- }
- }
- ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
- acceptor->from_server = sp->server;
- acceptor->port_index = sp->port_index;
- acceptor->fd_index = 0;
- sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
- gpr_free(peer_name_string);
-}
-
-static void on_connect(uv_stream_t* server, int status) {
- grpc_tcp_listener* sp = (grpc_tcp_listener*)server->data;
- grpc_core::ExecCtx exec_ctx;
-
- if (status < 0) {
- switch (status) {
- case UV_EINTR:
- case UV_EAGAIN:
- return;
- default:
- close_listener(sp);
- return;
- }
- }
-
- GPR_ASSERT(!sp->has_pending_connection);
-
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server);
- }
-
- // Create acceptor.
- if (sp->server->on_accept_cb) {
- finish_accept(sp);
- } else {
- sp->has_pending_connection = true;
- }
-}
-
-static grpc_error* add_addr_to_server(grpc_tcp_server* s,
- const grpc_resolved_address* addr,
- unsigned port_index,
- grpc_tcp_listener** listener) {
- grpc_tcp_listener* sp = NULL;
- int port = -1;
- int status;
- grpc_error* error;
- grpc_resolved_address sockname_temp;
- uv_tcp_t* handle = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
- int family = grpc_sockaddr_get_family(addr);
-
- status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
-#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
- if (family == AF_INET || family == AF_INET6) {
- int fd;
- uv_fileno((uv_handle_t*)handle, &fd);
- int enable = 1;
- setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
- }
-#endif /* GPR_LINUX && SO_REUSEPORT */
-
- if (status != 0) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Failed to initialize UV tcp handle");
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- return error;
- }
-
- // The last argument to uv_tcp_bind is flags
- status = uv_tcp_bind(handle, (struct sockaddr*)addr->addr, 0);
- if (status != 0) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to bind to port");
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- return error;
- }
-
- status = uv_listen((uv_stream_t*)handle, SOMAXCONN, on_connect);
- if (status != 0) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Failed to listen to port");
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- return error;
- }
-
- sockname_temp.len = (int)sizeof(struct sockaddr_storage);
- status = uv_tcp_getsockname(handle, (struct sockaddr*)&sockname_temp.addr,
- (int*)&sockname_temp.len);
- if (status != 0) {
- error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getsockname failed");
- error =
- grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- return error;
- }
-
- port = grpc_sockaddr_get_port(&sockname_temp);
-
- GPR_ASSERT(port >= 0);
- GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
- sp = (grpc_tcp_listener*)gpr_zalloc(sizeof(grpc_tcp_listener));
- sp->next = NULL;
- if (s->head == NULL) {
- s->head = sp;
- } else {
- s->tail->next = sp;
- }
- s->tail = sp;
- sp->server = s;
- sp->handle = handle;
- sp->port = port;
- sp->port_index = port_index;
- sp->closed = false;
- handle->data = sp;
- s->open_ports++;
- GPR_ASSERT(sp->handle);
- *listener = sp;
-
- return GRPC_ERROR_NONE;
-}
-
-static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
- unsigned port_index,
- int requested_port,
- grpc_tcp_listener** listener) {
- grpc_resolved_address wild4;
- grpc_resolved_address wild6;
- grpc_tcp_listener* sp = nullptr;
- grpc_tcp_listener* sp2 = nullptr;
- grpc_error* v6_err = GRPC_ERROR_NONE;
- grpc_error* v4_err = GRPC_ERROR_NONE;
-
- grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
- /* Try listening on IPv6 first. */
- if ((v6_err = add_addr_to_server(s, &wild6, port_index, &sp)) ==
- GRPC_ERROR_NONE) {
- *listener = sp;
- return GRPC_ERROR_NONE;
- }
-
- if ((v4_err = add_addr_to_server(s, &wild4, port_index, &sp2)) ==
- GRPC_ERROR_NONE) {
- *listener = sp2;
- return GRPC_ERROR_NONE;
- }
-
- grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Failed to add any wildcard listeners");
- root_err = grpc_error_add_child(root_err, v6_err);
- root_err = grpc_error_add_child(root_err, v4_err);
- return root_err;
-}
-
-grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
- const grpc_resolved_address* addr,
- int* port) {
- // This function is mostly copied from tcp_server_windows.c
- grpc_tcp_listener* sp = NULL;
- grpc_resolved_address addr6_v4mapped;
- grpc_resolved_address* allocated_addr = NULL;
- grpc_resolved_address sockname_temp;
- unsigned port_index = 0;
- grpc_error* error = GRPC_ERROR_NONE;
-
- GRPC_UV_ASSERT_SAME_THREAD();
-
- if (s->tail != NULL) {
- port_index = s->tail->port_index + 1;
- }
-
- /* Check if this is a wildcard port, and if so, try to keep the port the same
- as some previously created listener. */
- if (grpc_sockaddr_get_port(addr) == 0) {
- for (sp = s->head; sp; sp = sp->next) {
- sockname_temp.len = sizeof(struct sockaddr_storage);
- if (0 == uv_tcp_getsockname(sp->handle,
- (struct sockaddr*)&sockname_temp.addr,
- (int*)&sockname_temp.len)) {
- *port = grpc_sockaddr_get_port(&sockname_temp);
- if (*port > 0) {
- allocated_addr =
- (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
- memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
- grpc_sockaddr_set_port(allocated_addr, *port);
- addr = allocated_addr;
- break;
- }
- }
- }
- }
-
- /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
- if (grpc_sockaddr_is_wildcard(addr, port)) {
- error = add_wildcard_addrs_to_server(s, port_index, *port, &sp);
- } else {
- if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
- addr = &addr6_v4mapped;
- }
-
- error = add_addr_to_server(s, addr, port_index, &sp);
- }
-
- gpr_free(allocated_addr);
-
- if (grpc_tcp_trace.enabled()) {
- char* port_string;
- grpc_sockaddr_to_string(&port_string, addr, 0);
- const char* str = grpc_error_string(error);
- if (port_string) {
- gpr_log(GPR_DEBUG, "SERVER %p add_port %s error=%s", s, port_string, str);
- gpr_free(port_string);
- } else {
- gpr_log(GPR_DEBUG, "SERVER %p add_port error=%s", s, str);
- }
- }
-
- if (error != GRPC_ERROR_NONE) {
- grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Failed to add port to server", &error, 1);
- GRPC_ERROR_UNREF(error);
- error = error_out;
- *port = -1;
- } else {
- GPR_ASSERT(sp != NULL);
- *port = sp->port;
- }
- return error;
-}
-
-void grpc_tcp_server_start(grpc_tcp_server* server, grpc_pollset** pollsets,
- size_t pollset_count,
- grpc_tcp_server_cb on_accept_cb, void* cb_arg) {
- grpc_tcp_listener* sp;
- (void)pollsets;
- (void)pollset_count;
- GRPC_UV_ASSERT_SAME_THREAD();
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "SERVER_START %p", server);
- }
- GPR_ASSERT(on_accept_cb);
- GPR_ASSERT(!server->on_accept_cb);
- server->on_accept_cb = on_accept_cb;
- server->on_accept_cb_arg = cb_arg;
- for (sp = server->head; sp; sp = sp->next) {
- if (sp->has_pending_connection) {
- finish_accept(sp);
- sp->has_pending_connection = false;
- }
- }
-}
-
-void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
-
-#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/tcp_server_windows.cc b/src/core/lib/iomgr/tcp_server_windows.cc
index 6d19c1c4d7..77f3811dca 100644
--- a/src/core/lib/iomgr/tcp_server_windows.cc
+++ b/src/core/lib/iomgr/tcp_server_windows.cc
@@ -50,7 +50,7 @@ typedef struct grpc_tcp_listener grpc_tcp_listener;
struct grpc_tcp_listener {
/* This seemingly magic number comes from AcceptEx's documentation. each
address buffer needs to have at least 16 more bytes at their end. */
- uint8_t addresses[(sizeof(struct sockaddr_in6) + 16) * 2];
+ uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
/* This will hold the socket for the next accept. */
SOCKET new_socket;
/* The listener winsocket. */
@@ -96,9 +96,9 @@ struct grpc_tcp_server {
/* Public function. Allocates the proper data structures to hold a
grpc_tcp_server. */
-grpc_error* grpc_tcp_server_create(grpc_closure* shutdown_complete,
- const grpc_channel_args* args,
- grpc_tcp_server** server) {
+static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
+ const grpc_channel_args* args,
+ grpc_tcp_server** server) {
grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
s->channel_args = grpc_channel_args_copy(args);
gpr_ref_init(&s->refs, 1);
@@ -142,13 +142,13 @@ static void finish_shutdown_locked(grpc_tcp_server* s) {
GRPC_ERROR_NONE);
}
-grpc_tcp_server* grpc_tcp_server_ref(grpc_tcp_server* s) {
+static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
gpr_ref_non_zero(&s->refs);
return s;
}
-void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server* s,
- grpc_closure* shutdown_starting) {
+static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
+ grpc_closure* shutdown_starting) {
gpr_mu_lock(&s->mu);
grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
GRPC_ERROR_NONE);
@@ -172,7 +172,7 @@ static void tcp_server_destroy(grpc_tcp_server* s) {
gpr_mu_unlock(&s->mu);
}
-void grpc_tcp_server_unref(grpc_tcp_server* s) {
+static void tcp_server_unref(grpc_tcp_server* s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(s);
gpr_mu_lock(&s->mu);
@@ -195,7 +195,7 @@ static grpc_error* prepare_socket(SOCKET sock,
goto failure;
}
- if (bind(sock, (const struct sockaddr*)addr->addr, (int)addr->len) ==
+ if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
SOCKET_ERROR) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
goto failure;
@@ -207,7 +207,7 @@ static grpc_error* prepare_socket(SOCKET sock,
}
sockname_temp_len = sizeof(struct sockaddr_storage);
- if (getsockname(sock, (struct sockaddr*)sockname_temp.addr,
+ if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
&sockname_temp_len) == SOCKET_ERROR) {
error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
goto failure;
@@ -245,7 +245,7 @@ static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
static grpc_error* start_accept_locked(grpc_tcp_listener* port) {
SOCKET sock = INVALID_SOCKET;
BOOL success;
- DWORD addrlen = sizeof(struct sockaddr_in6) + 16;
+ DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
DWORD bytes_received = 0;
grpc_error* error = GRPC_ERROR_NONE;
@@ -343,7 +343,7 @@ static void on_accept(void* arg, grpc_error* error) {
gpr_free(utf8_message);
}
int peer_name_len = (int)peer_name.len;
- err = getpeername(sock, (struct sockaddr*)peer_name.addr, &peer_name_len);
+ err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
peer_name.len = (size_t)peer_name_len;
if (!err) {
peer_name_string = grpc_sockaddr_to_uri(&peer_name);
@@ -442,9 +442,9 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
return GRPC_ERROR_NONE;
}
-grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
- const grpc_resolved_address* addr,
- int* port) {
+static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
+ const grpc_resolved_address* addr,
+ int* port) {
grpc_tcp_listener* sp = NULL;
SOCKET sock;
grpc_resolved_address addr6_v4mapped;
@@ -464,7 +464,7 @@ grpc_error* grpc_tcp_server_add_port(grpc_tcp_server* s,
for (sp = s->head; sp; sp = sp->next) {
int sockname_temp_len = sizeof(struct sockaddr_storage);
if (0 == getsockname(sp->socket->socket,
- (struct sockaddr*)sockname_temp.addr,
+ (grpc_sockaddr*)sockname_temp.addr,
&sockname_temp_len)) {
sockname_temp.len = (size_t)sockname_temp_len;
*port = grpc_sockaddr_get_port(&sockname_temp);
@@ -516,10 +516,10 @@ done:
return error;
}
-void grpc_tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollset,
- size_t pollset_count,
- grpc_tcp_server_cb on_accept_cb,
- void* on_accept_cb_arg) {
+static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollset,
+ size_t pollset_count,
+ grpc_tcp_server_cb on_accept_cb,
+ void* on_accept_cb_arg) {
grpc_tcp_listener* sp;
GPR_ASSERT(on_accept_cb);
gpr_mu_lock(&s->mu);
@@ -534,6 +534,26 @@ void grpc_tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollset,
gpr_mu_unlock(&s->mu);
}
-void grpc_tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
+static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
+ unsigned port_index) {
+ return 0;
+}
+
+static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
+ unsigned fd_index) {
+ return -1;
+}
+static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
+
+grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
+ tcp_server_create,
+ tcp_server_start,
+ tcp_server_add_port,
+ tcp_server_port_fd_count,
+ tcp_server_port_fd,
+ tcp_server_ref,
+ tcp_server_shutdown_starting_add,
+ tcp_server_unref,
+ tcp_server_shutdown_listeners};
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc
index 6db3217d6e..5e3166926b 100644
--- a/src/core/lib/iomgr/tcp_uv.cc
+++ b/src/core/lib/iomgr/tcp_uv.cc
@@ -21,7 +21,6 @@
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_UV
-
#include <limits.h>
#include <string.h>
@@ -33,393 +32,393 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/error.h"
-#include "src/core/lib/iomgr/iomgr_uv.h"
+#include "src/core/lib/iomgr/iomgr_custom.h"
#include "src/core/lib/iomgr/network_status_tracker.h"
+#include "src/core/lib/iomgr/resolve_address_custom.h"
#include "src/core/lib/iomgr/resource_quota.h"
-#include "src/core/lib/iomgr/tcp_uv.h"
+#include "src/core/lib/iomgr/tcp_custom.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
-grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
+#include <uv.h>
-typedef struct {
- grpc_endpoint base;
- gpr_refcount refcount;
+#define IGNORE_CONST(addr) ((grpc_sockaddr*)(uintptr_t)(addr))
+typedef struct uv_socket_t {
+ uv_connect_t connect_req;
uv_write_t write_req;
uv_shutdown_t shutdown_req;
-
uv_tcp_t* handle;
-
- grpc_closure* read_cb;
- grpc_closure* write_cb;
-
- grpc_slice_buffer* read_slices;
- grpc_slice_buffer* write_slices;
uv_buf_t* write_buffers;
- grpc_resource_user* resource_user;
- grpc_resource_user_slice_allocator slice_allocator;
-
- bool shutting_down;
+ char* read_buf;
+ size_t read_len;
- char* peer_string;
- grpc_pollset* pollset;
-} grpc_tcp;
+ bool pending_connection;
+ grpc_custom_socket* accept_socket;
+ grpc_error* accept_error;
-static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) {
- return grpc_error_set_str(
- grpc_error_set_int(
- src_error,
- /* All tcp errors are marked with UNAVAILABLE so that application may
- * choose to retry. */
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
- GRPC_ERROR_STR_TARGET_ADDRESS,
- grpc_slice_from_copied_string(tcp->peer_string));
-}
+ grpc_custom_connect_callback connect_cb;
+ grpc_custom_write_callback write_cb;
+ grpc_custom_read_callback read_cb;
+ grpc_custom_accept_callback accept_cb;
+ grpc_custom_close_callback close_cb;
-static void tcp_free(grpc_tcp* tcp) {
- grpc_resource_user_unref(tcp->resource_user);
- gpr_free(tcp->handle);
- gpr_free(tcp->peer_string);
- gpr_free(tcp);
-}
-
-#ifndef NDEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
- int line) {
- if (grpc_tcp_trace.enabled()) {
- gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
- val - 1);
- }
- if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
- }
-}
+} uv_socket_t;
-static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
- int line) {
- if (grpc_tcp_trace.enabled()) {
- gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
- val + 1);
- }
- gpr_ref(&tcp->refcount);
-}
-#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
-#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_tcp* tcp) {
- if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+static grpc_error* tcp_error_create(const char* desc, int status) {
+ if (status == 0) {
+ return GRPC_ERROR_NONE;
}
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc);
+ /* All tcp errors are marked with UNAVAILABLE so that application may
+ * choose to retry. */
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ return grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ grpc_slice_from_static_string(uv_strerror(status)));
}
-static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
-#endif
-
-static void uv_close_callback(uv_handle_t* handle) {
- grpc_core::ExecCtx exec_ctx;
- grpc_tcp* tcp = (grpc_tcp*)handle->data;
- TCP_UNREF(tcp, "destroy");
+static void uv_socket_destroy(grpc_custom_socket* socket) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ gpr_free(uv_socket->handle);
+ gpr_free(uv_socket);
}
static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size,
uv_buf_t* buf) {
- grpc_core::ExecCtx exec_ctx;
- grpc_tcp* tcp = (grpc_tcp*)handle->data;
+ uv_socket_t* uv_socket =
+ (uv_socket_t*)((grpc_custom_socket*)handle->data)->impl;
(void)suggested_size;
- /* Before calling uv_read_start, we allocate a buffer with exactly one slice
- * to tcp->read_slices and wait for the callback indicating that the
- * allocation was successful. So slices[0] should always exist here */
- buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[0]);
- buf->len = GRPC_SLICE_LENGTH(tcp->read_slices->slices[0]);
-}
-
-static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
- grpc_closure* cb = tcp->read_cb;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
- size_t i;
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "read: error=%s", str);
-
- for (i = 0; i < tcp->read_slices->count; i++) {
- char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
- gpr_free(dump);
- }
- }
- tcp->read_slices = NULL;
- tcp->read_cb = NULL;
- GRPC_CLOSURE_RUN(cb, error);
+ buf->base = uv_socket->read_buf;
+ buf->len = uv_socket->read_len;
}
-static void read_callback(uv_stream_t* stream, ssize_t nread,
- const uv_buf_t* buf) {
- grpc_error* error;
- grpc_core::ExecCtx exec_ctx;
- grpc_tcp* tcp = (grpc_tcp*)stream->data;
- grpc_slice_buffer garbage;
+static void uv_read_callback(uv_stream_t* stream, ssize_t nread,
+ const uv_buf_t* buf) {
+ grpc_error* error = GRPC_ERROR_NONE;
if (nread == 0) {
// Nothing happened. Wait for the next callback
return;
}
- TCP_UNREF(tcp, "read");
// TODO(murgatroid99): figure out what the return value here means
uv_read_stop(stream);
if (nread == UV_EOF) {
- error =
- tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp);
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
- } else if (nread > 0) {
- // Successful read
- error = GRPC_ERROR_NONE;
- if ((size_t)nread < tcp->read_slices->length) {
- /* TODO(murgatroid99): Instead of discarding the unused part of the read
- * buffer, reuse it as the next read buffer. */
- grpc_slice_buffer_init(&garbage);
- grpc_slice_buffer_trim_end(
- tcp->read_slices, tcp->read_slices->length - (size_t)nread, &garbage);
- grpc_slice_buffer_reset_and_unref_internal(&garbage);
- }
- } else {
- // nread < 0: Error
- error = tcp_annotate_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"), tcp);
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF");
+ } else if (nread < 0) {
+ error = tcp_error_create("TCP Read failed", nread);
}
- call_read_cb(tcp, error);
+ grpc_custom_socket* socket = (grpc_custom_socket*)stream->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->read_cb(socket, (size_t)nread, error);
}
-static void tcp_read_allocation_done(void* tcpp, grpc_error* error) {
- int status;
- grpc_tcp* tcp = (grpc_tcp*)tcpp;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp,
- grpc_error_string(error));
- }
- if (error == GRPC_ERROR_NONE) {
- status =
- uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback);
- if (status != 0) {
- error = tcp_annotate_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"),
- tcp);
- error = grpc_error_set_str(
- error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- }
- }
- if (error != GRPC_ERROR_NONE) {
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
- call_read_cb(tcp, GRPC_ERROR_REF(error));
- TCP_UNREF(tcp, "read");
+static void uv_close_callback(uv_handle_t* handle) {
+ grpc_custom_socket* socket = (grpc_custom_socket*)handle->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ if (uv_socket->accept_socket) {
+ uv_socket->accept_cb(socket, uv_socket->accept_socket,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("socket closed"));
}
- if (grpc_tcp_trace.enabled()) {
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str);
+ uv_socket->close_cb(socket);
+}
+
+static void uv_socket_read(grpc_custom_socket* socket, char* buffer,
+ size_t length, grpc_custom_read_callback read_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int status;
+ grpc_error* error;
+ uv_socket->read_cb = read_cb;
+ uv_socket->read_buf = buffer;
+ uv_socket->read_len = length;
+ // TODO(murgatroid99): figure out what the return value here means
+ status =
+ uv_read_start((uv_stream_t*)uv_socket->handle, (uv_alloc_cb)alloc_uv_buf,
+ (uv_read_cb)uv_read_callback);
+ if (status != 0) {
+ error = tcp_error_create("TCP Read failed at start", status);
+ uv_socket->read_cb(socket, 0, error);
}
}
-static void uv_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
- grpc_closure* cb) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- GRPC_UV_ASSERT_SAME_THREAD();
- GPR_ASSERT(tcp->read_cb == NULL);
- tcp->read_cb = cb;
- tcp->read_slices = read_slices;
- grpc_slice_buffer_reset_and_unref_internal(read_slices);
- TCP_REF(tcp, "read");
- grpc_resource_user_alloc_slices(&tcp->slice_allocator,
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
- tcp->read_slices);
+static void uv_write_callback(uv_write_t* req, int status) {
+ grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ gpr_free(uv_socket->write_buffers);
+ uv_socket->write_cb(socket, tcp_error_create("TCP Write failed", status));
}
-static void write_callback(uv_write_t* req, int status) {
- grpc_tcp* tcp = (grpc_tcp*)req->data;
- grpc_error* error;
- grpc_core::ExecCtx exec_ctx;
- grpc_closure* cb = tcp->write_cb;
- tcp->write_cb = NULL;
- TCP_UNREF(tcp, "write");
- if (status == 0) {
- error = GRPC_ERROR_NONE;
- } else {
- error = tcp_annotate_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Write failed"), tcp);
- }
- if (grpc_tcp_trace.enabled()) {
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
+void uv_socket_write(grpc_custom_socket* socket,
+ grpc_slice_buffer* write_slices,
+ grpc_custom_write_callback write_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->write_cb = write_cb;
+ uv_buf_t* uv_buffers;
+ uv_write_t* write_req;
+
+ uv_buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * write_slices->count);
+ for (size_t i = 0; i < write_slices->count; i++) {
+ uv_buffers[i].base = (char*)GRPC_SLICE_START_PTR(write_slices->slices[i]);
+ uv_buffers[i].len = GRPC_SLICE_LENGTH(write_slices->slices[i]);
}
- gpr_free(tcp->write_buffers);
- GRPC_CLOSURE_SCHED(cb, error);
+
+ uv_socket->write_buffers = uv_buffers;
+ write_req = &uv_socket->write_req;
+ write_req->data = socket;
+ // TODO(murgatroid99): figure out what the return value here means
+ uv_write(write_req, (uv_stream_t*)uv_socket->handle, uv_buffers,
+ write_slices->count, uv_write_callback);
}
-static void uv_endpoint_write(grpc_endpoint* ep,
- grpc_slice_buffer* write_slices,
- grpc_closure* cb) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- uv_buf_t* buffers;
- unsigned int buffer_count;
- unsigned int i;
- grpc_slice* slice;
- uv_write_t* write_req;
- GRPC_UV_ASSERT_SAME_THREAD();
+static void shutdown_callback(uv_shutdown_t* req, int status) {}
- if (grpc_tcp_trace.enabled()) {
- size_t j;
+static void uv_socket_shutdown(grpc_custom_socket* socket) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_shutdown_t* req = &uv_socket->shutdown_req;
+ uv_shutdown(req, (uv_stream_t*)uv_socket->handle, shutdown_callback);
+}
- for (j = 0; j < write_slices->count; j++) {
- char* data = grpc_dump_slice(write_slices->slices[j],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
- gpr_free(data);
- }
+static void uv_socket_close(grpc_custom_socket* socket,
+ grpc_custom_close_callback close_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->close_cb = close_cb;
+ uv_close((uv_handle_t*)uv_socket->handle, uv_close_callback);
+}
+
+static grpc_error* uv_socket_init_helper(uv_socket_t* uv_socket, int domain) {
+ uv_tcp_t* tcp = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
+ uv_socket->handle = tcp;
+ int status = uv_tcp_init_ex(uv_default_loop(), tcp, (unsigned int)domain);
+ if (status != 0) {
+ return tcp_error_create("Failed to initialize UV tcp handle", status);
}
+ uv_socket->write_buffers = nullptr;
+ uv_socket->read_len = 0;
+ uv_tcp_nodelay(uv_socket->handle, 1);
+ uv_socket->pending_connection = false;
+ uv_socket->accept_socket = nullptr;
+ uv_socket->accept_error = GRPC_ERROR_NONE;
+ return GRPC_ERROR_NONE;
+}
- if (tcp->shutting_down) {
- GRPC_CLOSURE_SCHED(cb,
- tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "TCP socket is shutting down"),
- tcp));
- return;
+static grpc_error* uv_socket_init(grpc_custom_socket* socket, int domain) {
+ uv_socket_t* uv_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
+ grpc_error* error = uv_socket_init_helper(uv_socket, domain);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
}
+ uv_socket->handle->data = socket;
+ socket->impl = uv_socket;
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error* uv_socket_getpeername(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr,
+ int* addr_len) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int err = uv_tcp_getpeername(uv_socket->handle,
+ (struct sockaddr*)IGNORE_CONST(addr), addr_len);
+ return tcp_error_create("getpeername failed", err);
+}
+
+static grpc_error* uv_socket_getsockname(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr,
+ int* addr_len) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int err = uv_tcp_getsockname(uv_socket->handle,
+ (struct sockaddr*)IGNORE_CONST(addr), addr_len);
+ return tcp_error_create("getsockname failed", err);
+}
- GPR_ASSERT(tcp->write_cb == NULL);
- tcp->write_slices = write_slices;
- GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
- if (tcp->write_slices->count == 0) {
- // No slices means we don't have to do anything,
- // and libuv doesn't like empty writes
- GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
+static void accept_new_connection(grpc_custom_socket* socket) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ if (!uv_socket->pending_connection || !uv_socket->accept_socket) {
return;
}
+ grpc_custom_socket* new_socket = uv_socket->accept_socket;
+ grpc_error* error = uv_socket->accept_error;
+ uv_socket->accept_socket = nullptr;
+ uv_socket->accept_error = GRPC_ERROR_NONE;
+ uv_socket->pending_connection = false;
+ if (uv_socket->accept_error != GRPC_ERROR_NONE) {
+ uv_stream_t dummy_handle;
+ uv_accept((uv_stream_t*)uv_socket->handle, &dummy_handle);
+ uv_socket->accept_cb(socket, new_socket, error);
+ } else {
+ uv_socket_t* uv_new_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
+ uv_socket_init_helper(uv_new_socket, AF_UNSPEC);
+ // UV documentation says this is guaranteed to succeed
+ GPR_ASSERT(uv_accept((uv_stream_t*)uv_socket->handle,
+ (uv_stream_t*)uv_new_socket->handle) == 0);
+ new_socket->impl = uv_new_socket;
+ uv_new_socket->handle->data = new_socket;
+ uv_socket->accept_cb(socket, new_socket, error);
+ }
+}
- tcp->write_cb = cb;
- buffer_count = (unsigned int)tcp->write_slices->count;
- buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * buffer_count);
- for (i = 0; i < buffer_count; i++) {
- slice = &tcp->write_slices->slices[i];
- buffers[i].base = (char*)GRPC_SLICE_START_PTR(*slice);
- buffers[i].len = GRPC_SLICE_LENGTH(*slice);
+static void uv_on_connect(uv_stream_t* server, int status) {
+ grpc_custom_socket* socket = (grpc_custom_socket*)server->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ GPR_ASSERT(!uv_socket->pending_connection);
+ uv_socket->pending_connection = true;
+ if (status < 0) {
+ switch (status) {
+ case UV_EINTR:
+ case UV_EAGAIN:
+ return;
+ default:
+ uv_socket->accept_error = tcp_error_create("accept failed", status);
+ }
}
- tcp->write_buffers = buffers;
- write_req = &tcp->write_req;
- write_req->data = tcp;
- TCP_REF(tcp, "write");
- // TODO(murgatroid99): figure out what the return value here means
- uv_write(write_req, (uv_stream_t*)tcp->handle, buffers, buffer_count,
- write_callback);
+ accept_new_connection(socket);
}
-static void uv_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
- // No-op. We're ignoring pollsets currently
- (void)ep;
- (void)pollset;
- grpc_tcp* tcp = (grpc_tcp*)ep;
- tcp->pollset = pollset;
+void uv_socket_accept(grpc_custom_socket* socket,
+ grpc_custom_socket* new_socket,
+ grpc_custom_accept_callback accept_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->accept_cb = accept_cb;
+ GPR_ASSERT(uv_socket->accept_socket == nullptr);
+ uv_socket->accept_socket = new_socket;
+ accept_new_connection(socket);
}
-static void uv_add_to_pollset_set(grpc_endpoint* ep,
- grpc_pollset_set* pollset) {
- // No-op. We're ignoring pollsets currently
- (void)ep;
- (void)pollset;
+static grpc_error* uv_socket_bind(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr, size_t len,
+ int flags) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int status =
+ uv_tcp_bind((uv_tcp_t*)uv_socket->handle, (struct sockaddr*)addr, 0);
+ return tcp_error_create("Failed to bind to port", status);
}
-static void uv_delete_from_pollset_set(grpc_endpoint* ep,
- grpc_pollset_set* pollset) {
- // No-op. We're ignoring pollsets currently
- (void)ep;
- (void)pollset;
+static grpc_error* uv_socket_listen(grpc_custom_socket* socket) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int status =
+ uv_listen((uv_stream_t*)uv_socket->handle, SOMAXCONN, uv_on_connect);
+ return tcp_error_create("Failed to listen to port", status);
}
-static void shutdown_callback(uv_shutdown_t* req, int status) {}
+static grpc_error* uv_socket_setsockopt(grpc_custom_socket* socket, int level,
+ int option_name, const void* optval,
+ socklen_t option_len) {
+ int fd;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_fileno((uv_handle_t*)uv_socket->handle, &fd);
+ // TODO Handle error here. Also, does this work on windows??
+ setsockopt(fd, level, option_name, &optval, (socklen_t)option_len);
+ return GRPC_ERROR_NONE;
+}
-static void uv_endpoint_shutdown(grpc_endpoint* ep, grpc_error* why) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- if (!tcp->shutting_down) {
- if (grpc_tcp_trace.enabled()) {
- const char* str = grpc_error_string(why);
- gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str);
- }
- tcp->shutting_down = true;
- uv_shutdown_t* req = &tcp->shutdown_req;
- uv_shutdown(req, (uv_stream_t*)tcp->handle, shutdown_callback);
- grpc_resource_user_shutdown(tcp->resource_user);
+static void uv_tc_on_connect(uv_connect_t* req, int status) {
+ grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ grpc_error* error;
+ if (status == UV_ECANCELED) {
+ // This should only happen if the handle is already closed
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timeout occurred");
+ } else {
+ error = tcp_error_create("Failed to connect to remote host", status);
}
- GRPC_ERROR_UNREF(why);
+ uv_socket->connect_cb(socket, error);
}
-static void uv_destroy(grpc_endpoint* ep) {
- grpc_network_status_unregister_endpoint(ep);
- grpc_tcp* tcp = (grpc_tcp*)ep;
- uv_close((uv_handle_t*)tcp->handle, uv_close_callback);
+static void uv_socket_connect(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr, size_t len,
+ grpc_custom_connect_callback connect_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->connect_cb = connect_cb;
+ uv_socket->connect_req.data = socket;
+ int status = uv_tcp_connect(&uv_socket->connect_req, uv_socket->handle,
+ (struct sockaddr*)addr, uv_tc_on_connect);
+ if (status != 0) {
+ // The callback will not be called
+ uv_socket->connect_cb(socket, tcp_error_create("connect failed", status));
+ }
+}
+
+static grpc_resolved_addresses* handle_addrinfo_result(
+ struct addrinfo* result) {
+ struct addrinfo* resp;
+ struct addrinfo* prev;
+ size_t i;
+ grpc_resolved_addresses* addresses =
+ (grpc_resolved_addresses*)gpr_malloc(sizeof(grpc_resolved_addresses));
+ addresses->naddrs = 0;
+ for (resp = result; resp != nullptr; resp = resp->ai_next) {
+ addresses->naddrs++;
+ }
+ addresses->addrs = (grpc_resolved_address*)gpr_malloc(
+ sizeof(grpc_resolved_address) * addresses->naddrs);
+ i = 0;
+ resp = result;
+ while (resp != nullptr) {
+ memcpy(&addresses->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
+ addresses->addrs[i].len = resp->ai_addrlen;
+ i++;
+ prev = resp;
+ resp = resp->ai_next;
+ gpr_free(prev);
+ }
+ return addresses;
}
-static char* uv_get_peer(grpc_endpoint* ep) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- return gpr_strdup(tcp->peer_string);
+static void uv_resolve_callback(uv_getaddrinfo_t* req, int status,
+ struct addrinfo* res) {
+ grpc_custom_resolver* r = (grpc_custom_resolver*)req->data;
+ gpr_free(req);
+ grpc_resolved_addresses* result = nullptr;
+ if (status == 0) {
+ result = handle_addrinfo_result(res);
+ }
+ grpc_custom_resolve_callback(r, result,
+ tcp_error_create("getaddrinfo failed", status));
}
-static grpc_resource_user* uv_get_resource_user(grpc_endpoint* ep) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- return tcp->resource_user;
+static grpc_error* uv_resolve(char* host, char* port,
+ grpc_resolved_addresses** result) {
+ int status;
+ uv_getaddrinfo_t req;
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */
+ hints.ai_socktype = SOCK_STREAM; /* stream socket */
+ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ status = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
+ if (status != 0) {
+ *result = nullptr;
+ } else {
+ *result = handle_addrinfo_result(req.addrinfo);
+ }
+ return tcp_error_create("getaddrinfo failed", status);
}
-static int uv_get_fd(grpc_endpoint* ep) { return -1; }
-
-static grpc_endpoint_vtable vtable = {uv_endpoint_read,
- uv_endpoint_write,
- uv_add_to_pollset,
- uv_add_to_pollset_set,
- uv_delete_from_pollset_set,
- uv_endpoint_shutdown,
- uv_destroy,
- uv_get_resource_user,
- uv_get_peer,
- uv_get_fd};
-
-grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle,
- grpc_resource_quota* resource_quota,
- char* peer_string) {
- grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp));
- grpc_core::ExecCtx exec_ctx;
-
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
+static void uv_resolve_async(grpc_custom_resolver* r, char* host, char* port) {
+ int status;
+ uv_getaddrinfo_t* req =
+ (uv_getaddrinfo_t*)gpr_malloc(sizeof(uv_getaddrinfo_t));
+ req->data = r;
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = GRPC_AF_UNSPEC; /* ipv4 or ipv6 */
+ hints.ai_socktype = GRPC_SOCK_STREAM; /* stream socket */
+ hints.ai_flags = GRPC_AI_PASSIVE; /* for wildcard IP address */
+ status = uv_getaddrinfo(uv_default_loop(), req, uv_resolve_callback, host,
+ port, &hints);
+ if (status != 0) {
+ gpr_free(req);
+ grpc_error* error = tcp_error_create("getaddrinfo failed", status);
+ grpc_custom_resolve_callback(r, NULL, error);
}
+}
- /* Disable Nagle's Algorithm */
- uv_tcp_nodelay(handle, 1);
-
- memset(tcp, 0, sizeof(grpc_tcp));
- tcp->base.vtable = &vtable;
- tcp->handle = handle;
- handle->data = tcp;
- gpr_ref_init(&tcp->refcount, 1);
- tcp->peer_string = gpr_strdup(peer_string);
- tcp->shutting_down = false;
- tcp->read_slices = NULL;
- tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
- grpc_resource_user_slice_allocator_init(
- &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
- /* Tell network status tracking code about the new endpoint */
- grpc_network_status_register_endpoint(&tcp->base);
-
-#ifndef GRPC_UV_TCP_HOLD_LOOP
- uv_unref((uv_handle_t*)handle);
-#endif
+grpc_custom_resolver_vtable uv_resolver_vtable = {uv_resolve, uv_resolve_async};
- return &tcp->base;
-}
+grpc_socket_vtable grpc_uv_socket_vtable = {
+ uv_socket_init, uv_socket_connect, uv_socket_destroy,
+ uv_socket_shutdown, uv_socket_close, uv_socket_write,
+ uv_socket_read, uv_socket_getpeername, uv_socket_getsockname,
+ uv_socket_setsockopt, uv_socket_bind, uv_socket_listen,
+ uv_socket_accept};
-#endif /* GRPC_UV */
+#endif
diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h
deleted file mode 100644
index 6b1a6f77c2..0000000000
--- a/src/core/lib/iomgr/tcp_uv.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_TCP_UV_H
-#define GRPC_CORE_LIB_IOMGR_TCP_UV_H
-/*
- Low level TCP "bottom half" implementation, for use by transports built on
- top of a TCP connection.
-
- Note that this file does not (yet) include APIs for creating the socket in
- the first place.
-
- All calls passing slice transfer ownership of a slice refcount unless
- otherwise specified.
-*/
-
-#include <grpc/support/port_platform.h>
-
-#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/endpoint.h"
-
-#include "src/core/lib/iomgr/port.h"
-
-#ifdef GRPC_UV
-
-#include <uv.h>
-
-extern grpc_core::TraceFlag grpc_tcp_trace;
-
-#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
-
-grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle,
- grpc_resource_quota* resource_quota,
- char* peer_string);
-
-#endif /* GRPC_UV */
-
-#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */
diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc
index aab8edc888..04e6f11eee 100644
--- a/src/core/lib/iomgr/tcp_windows.cc
+++ b/src/core/lib/iomgr/tcp_windows.cc
@@ -51,7 +51,7 @@
#define GRPC_FIONBIO FIONBIO
#endif
-grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
+extern grpc_core::TraceFlag grpc_tcp_trace;
static grpc_error* set_non_block(SOCKET sock) {
int status;
diff --git a/src/core/lib/iomgr/timer.cc b/src/core/lib/iomgr/timer.cc
new file mode 100644
index 0000000000..e647cdefa0
--- /dev/null
+++ b/src/core/lib/iomgr/timer.cc
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/iomgr/timer_manager.h"
+
+grpc_timer_vtable* grpc_timer_impl;
+
+void grpc_set_timer_impl(grpc_timer_vtable* vtable) {
+ grpc_timer_impl = vtable;
+}
+
+void grpc_timer_init(grpc_timer* timer, grpc_millis deadline,
+ grpc_closure* closure) {
+ grpc_timer_impl->init(timer, deadline, closure);
+}
+
+void grpc_timer_cancel(grpc_timer* timer) { grpc_timer_impl->cancel(timer); }
+
+grpc_timer_check_result grpc_timer_check(grpc_millis* next) {
+ return grpc_timer_impl->check(next);
+}
+
+void grpc_timer_list_init() { grpc_timer_impl->list_init(); }
+
+void grpc_timer_list_shutdown() { grpc_timer_impl->list_shutdown(); }
+
+void grpc_timer_consume_kick() { grpc_timer_impl->consume_kick(); }
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index 67f1b1b3f9..5ff10d3aee 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -23,17 +23,41 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_UV
-#include "src/core/lib/iomgr/timer_uv.h"
-#else
-#include "src/core/lib/iomgr/timer_generic.h"
-#endif /* GRPC_UV */
-
#include <grpc/support/time.h>
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr.h"
-typedef struct grpc_timer grpc_timer;
+typedef struct grpc_timer {
+ gpr_atm deadline;
+ uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */
+ bool pending;
+ struct grpc_timer* next;
+ struct grpc_timer* prev;
+ grpc_closure* closure;
+#ifndef NDEBUG
+ struct grpc_timer* hash_table_next;
+#endif
+
+ // Optional field used by custom timers
+ void* custom_timer;
+} grpc_timer;
+
+typedef enum {
+ GRPC_TIMERS_NOT_CHECKED,
+ GRPC_TIMERS_CHECKED_AND_EMPTY,
+ GRPC_TIMERS_FIRED,
+} grpc_timer_check_result;
+
+typedef struct grpc_timer_vtable {
+ void (*init)(grpc_timer* timer, grpc_millis, grpc_closure* closure);
+ void (*cancel)(grpc_timer* timer);
+
+ /* Internal API */
+ grpc_timer_check_result (*check)(grpc_millis* next);
+ void (*list_init)();
+ void (*list_shutdown)(void);
+ void (*consume_kick)(void);
+} grpc_timer_vtable;
/* Initialize *timer. When expired or canceled, closure will be called with
error set to indicate if it expired (GRPC_ERROR_NONE) or was canceled
@@ -78,12 +102,6 @@ void grpc_timer_cancel(grpc_timer* timer);
/* iomgr internal api for dealing with timers */
-typedef enum {
- GRPC_TIMERS_NOT_CHECKED,
- GRPC_TIMERS_CHECKED_AND_EMPTY,
- GRPC_TIMERS_FIRED,
-} grpc_timer_check_result;
-
/* Check for timers to be run, and run them.
Return true if timer callbacks were executed.
If next is non-null, TRY to update *next with the next running timer
@@ -99,7 +117,9 @@ void grpc_timer_list_shutdown();
void grpc_timer_consume_kick(void);
/* the following must be implemented by each iomgr implementation */
-
void grpc_kick_poller(void);
+/* Sets the timer implementation */
+void grpc_set_timer_impl(grpc_timer_vtable* vtable);
+
#endif /* GRPC_CORE_LIB_IOMGR_TIMER_H */
diff --git a/src/core/lib/iomgr/timer_custom.cc b/src/core/lib/iomgr/timer_custom.cc
new file mode 100644
index 0000000000..71d825ff9f
--- /dev/null
+++ b/src/core/lib/iomgr/timer_custom.cc
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/iomgr_custom.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/iomgr/timer_custom.h"
+
+static grpc_custom_timer_vtable* custom_timer_impl;
+
+void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ grpc_core::ExecCtx exec_ctx;
+ grpc_timer* timer = t->original;
+ GPR_ASSERT(timer->pending);
+ timer->pending = 0;
+ GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE);
+ custom_timer_impl->stop(t);
+ gpr_free(t);
+}
+
+static void timer_init(grpc_timer* timer, grpc_millis deadline,
+ grpc_closure* closure) {
+ uint64_t timeout;
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ grpc_millis now = grpc_core::ExecCtx::Get()->Now();
+ if (deadline <= grpc_core::ExecCtx::Get()->Now()) {
+ GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
+ timer->pending = false;
+ return;
+ } else {
+ timeout = deadline - now;
+ }
+ timer->pending = true;
+ timer->closure = closure;
+ grpc_custom_timer* timer_wrapper =
+ (grpc_custom_timer*)gpr_malloc(sizeof(grpc_custom_timer));
+ timer_wrapper->timeout_ms = timeout;
+ timer->custom_timer = (void*)timer_wrapper;
+ timer_wrapper->original = timer;
+ custom_timer_impl->start(timer_wrapper);
+}
+
+static void timer_cancel(grpc_timer* timer) {
+ GRPC_CUSTOM_IOMGR_ASSERT_SAME_THREAD();
+ grpc_custom_timer* tw = (grpc_custom_timer*)timer->custom_timer;
+ if (timer->pending) {
+ timer->pending = 0;
+ GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED);
+ custom_timer_impl->stop(tw);
+ gpr_free(tw);
+ }
+}
+
+static grpc_timer_check_result timer_check(grpc_millis* next) {
+ return GRPC_TIMERS_NOT_CHECKED;
+}
+
+static void timer_list_init() {}
+static void timer_list_shutdown() {}
+
+static void timer_consume_kick(void) {}
+
+static grpc_timer_vtable custom_timer_vtable = {
+ timer_init, timer_cancel, timer_check,
+ timer_list_init, timer_list_shutdown, timer_consume_kick};
+
+void grpc_custom_timer_init(grpc_custom_timer_vtable* impl) {
+ custom_timer_impl = impl;
+ grpc_set_timer_impl(&custom_timer_vtable);
+}
diff --git a/src/core/lib/iomgr/timer_custom.h b/src/core/lib/iomgr/timer_custom.h
new file mode 100644
index 0000000000..bfea8bafa6
--- /dev/null
+++ b/src/core/lib/iomgr/timer_custom.h
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_TIMER_CUSTOM_H
+#define GRPC_CORE_LIB_IOMGR_TIMER_CUSTOM_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/iomgr/timer.h"
+
+typedef struct grpc_custom_timer {
+ // Implementation defined
+ void* timer;
+ uint64_t timeout_ms;
+
+ grpc_timer* original;
+} grpc_custom_timer;
+
+typedef struct grpc_custom_timer_vtable {
+ void (*start)(grpc_custom_timer* t);
+ void (*stop)(grpc_custom_timer* t);
+} grpc_custom_timer_vtable;
+
+void grpc_custom_timer_init(grpc_custom_timer_vtable* impl);
+
+void grpc_custom_timer_callback(grpc_custom_timer* t, grpc_error* error);
+
+#endif /* GRPC_CORE_LIB_IOMGR_TIMER_CUSTOM_H */
diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc
index 52a571f425..93e654b7fa 100644
--- a/src/core/lib/iomgr/timer_generic.cc
+++ b/src/core/lib/iomgr/timer_generic.cc
@@ -22,8 +22,6 @@
#include <inttypes.h>
-#ifdef GRPC_TIMER_USE_GENERIC
-
#include "src/core/lib/iomgr/timer.h"
#include <grpc/support/alloc.h>
@@ -238,7 +236,7 @@ static gpr_atm compute_min_deadline(timer_shard* shard) {
: grpc_timer_heap_top(&shard->heap)->deadline;
}
-void grpc_timer_list_init() {
+static void timer_list_init() {
uint32_t i;
g_num_shards = GPR_MIN(1, 2 * gpr_cpu_num_cores());
@@ -270,7 +268,7 @@ void grpc_timer_list_init() {
INIT_TIMER_HASH_TABLE();
}
-void grpc_timer_list_shutdown() {
+static void timer_list_shutdown() {
size_t i;
run_some_expired_timers(
GPR_ATM_MAX, nullptr,
@@ -326,8 +324,8 @@ static void note_deadline_change(timer_shard* shard) {
void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = false; }
-void grpc_timer_init(grpc_timer* timer, grpc_millis deadline,
- grpc_closure* closure) {
+static void timer_init(grpc_timer* timer, grpc_millis deadline,
+ grpc_closure* closure) {
int is_first_timer = 0;
timer_shard* shard = &g_shards[GPR_HASH_POINTER(timer, g_num_shards)];
timer->closure = closure;
@@ -412,12 +410,12 @@ void grpc_timer_init(grpc_timer* timer, grpc_millis deadline,
}
}
-void grpc_timer_consume_kick(void) {
+static void timer_consume_kick(void) {
/* force re-evaluation of last seeen min */
gpr_tls_set(&g_last_seen_min_timer, 0);
}
-void grpc_timer_cancel(grpc_timer* timer) {
+static void timer_cancel(grpc_timer* timer) {
if (!g_shared_mutables.initialized) {
/* must have already been cancelled, also the shard mutex is invalid */
return;
@@ -604,7 +602,7 @@ static grpc_timer_check_result run_some_expired_timers(gpr_atm now,
return result;
}
-grpc_timer_check_result grpc_timer_check(grpc_millis* next) {
+static grpc_timer_check_result timer_check(grpc_millis* next) {
// prelude
grpc_millis now = grpc_core::ExecCtx::Get()->Now();
@@ -660,4 +658,6 @@ grpc_timer_check_result grpc_timer_check(grpc_millis* next) {
return r;
}
-#endif /* GRPC_TIMER_USE_GENERIC */
+grpc_timer_vtable grpc_generic_timer_vtable = {
+ timer_init, timer_cancel, timer_check,
+ timer_list_init, timer_list_shutdown, timer_consume_kick};
diff --git a/src/core/lib/iomgr/timer_heap.cc b/src/core/lib/iomgr/timer_heap.cc
index e5b5abfc97..0c17d607eb 100644
--- a/src/core/lib/iomgr/timer_heap.cc
+++ b/src/core/lib/iomgr/timer_heap.cc
@@ -20,8 +20,6 @@
#include "src/core/lib/iomgr/port.h"
-#ifdef GRPC_TIMER_USE_GENERIC
-
#include "src/core/lib/iomgr/timer_heap.h"
#include <string.h>
@@ -135,5 +133,3 @@ grpc_timer* grpc_timer_heap_top(grpc_timer_heap* heap) {
void grpc_timer_heap_pop(grpc_timer_heap* heap) {
grpc_timer_heap_remove(heap, grpc_timer_heap_top(heap));
}
-
-#endif /* GRPC_TIMER_USE_GENERIC */
diff --git a/src/core/lib/iomgr/timer_uv.cc b/src/core/lib/iomgr/timer_uv.cc
index 6f28f553c5..dadeb960b2 100644
--- a/src/core/lib/iomgr/timer_uv.cc
+++ b/src/core/lib/iomgr/timer_uv.cc
@@ -20,20 +20,18 @@
#include "src/core/lib/iomgr/port.h"
-#if GRPC_UV
+#ifdef GRPC_UV
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/iomgr_uv.h"
+#include "src/core/lib/iomgr/iomgr_custom.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/iomgr/timer_custom.h"
#include <uv.h>
-grpc_core::TraceFlag grpc_timer_trace(false, "timer");
-grpc_core::TraceFlag grpc_timer_check_trace(false, "timer_check");
-
static void timer_close_callback(uv_handle_t* handle) { gpr_free(handle); }
static void stop_uv_timer(uv_timer_t* handle) {
@@ -43,57 +41,23 @@ static void stop_uv_timer(uv_timer_t* handle) {
}
void run_expired_timer(uv_timer_t* handle) {
- grpc_timer* timer = (grpc_timer*)handle->data;
- grpc_core::ExecCtx exec_ctx;
- GRPC_UV_ASSERT_SAME_THREAD();
- GPR_ASSERT(timer->pending);
- timer->pending = 0;
- GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE);
- stop_uv_timer(handle);
+ grpc_custom_timer* timer_wrapper = (grpc_custom_timer*)handle->data;
+ grpc_custom_timer_callback(timer_wrapper, GRPC_ERROR_NONE);
}
-void grpc_timer_init(grpc_timer* timer, grpc_millis deadline,
- grpc_closure* closure) {
- uint64_t timeout;
+static void timer_start(grpc_custom_timer* t) {
uv_timer_t* uv_timer;
- GRPC_UV_ASSERT_SAME_THREAD();
- timer->closure = closure;
- if (deadline <= grpc_core::ExecCtx::Get()->Now()) {
- timer->pending = 0;
- GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_NONE);
- return;
- }
- timer->pending = 1;
- timeout = (uint64_t)(deadline - grpc_core::ExecCtx::Get()->Now());
uv_timer = (uv_timer_t*)gpr_malloc(sizeof(uv_timer_t));
uv_timer_init(uv_default_loop(), uv_timer);
- uv_timer->data = timer;
- timer->uv_timer = uv_timer;
- uv_timer_start(uv_timer, run_expired_timer, timeout, 0);
- /* We assume that gRPC timers are only used alongside other active gRPC
- objects, and that there will therefore always be something else keeping
- the uv loop alive whenever there is a timer */
- uv_unref((uv_handle_t*)uv_timer);
+ uv_timer->data = t;
+ t->timer = (void*)uv_timer;
+ uv_timer_start(uv_timer, run_expired_timer, t->timeout_ms, 0);
}
-void grpc_timer_init_unset(grpc_timer* timer) { timer->pending = 0; }
-
-void grpc_timer_cancel(grpc_timer* timer) {
- GRPC_UV_ASSERT_SAME_THREAD();
- if (timer->pending) {
- timer->pending = 0;
- GRPC_CLOSURE_SCHED(timer->closure, GRPC_ERROR_CANCELLED);
- stop_uv_timer((uv_timer_t*)timer->uv_timer);
- }
+static void timer_stop(grpc_custom_timer* t) {
+ stop_uv_timer((uv_timer_t*)t->timer);
}
-grpc_timer_check_result grpc_timer_check(grpc_millis* next) {
- return GRPC_TIMERS_NOT_CHECKED;
-}
-
-void grpc_timer_list_init() {}
-void grpc_timer_list_shutdown() {}
-
-void grpc_timer_consume_kick(void) {}
+grpc_custom_timer_vtable uv_timer_vtable = {timer_start, timer_stop};
-#endif /* GRPC_UV */
+#endif
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index e739a5df93..15a242abe2 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -345,7 +345,7 @@ static int bind_socket(grpc_socket_factory* socket_factory, int sockfd,
return (socket_factory != nullptr)
? grpc_socket_factory_bind(socket_factory, sockfd, addr)
: bind(sockfd,
- reinterpret_cast<struct sockaddr*>(
+ reinterpret_cast<grpc_sockaddr*>(
const_cast<char*>(addr->addr)),
static_cast<socklen_t>(addr->len));
}
@@ -355,8 +355,8 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd,
const grpc_resolved_address* addr, int rcv_buf_size,
int snd_buf_size) {
grpc_resolved_address sockname_temp;
- struct sockaddr* addr_ptr =
- reinterpret_cast<struct sockaddr*>(const_cast<char*>(addr->addr));
+ grpc_sockaddr* addr_ptr =
+ reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr));
if (fd < 0) {
goto error;
@@ -392,7 +392,7 @@ static int prepare_socket(grpc_socket_factory* socket_factory, int fd,
sockname_temp.len = sizeof(struct sockaddr_storage);
- if (getsockname(fd, reinterpret_cast<struct sockaddr*>(sockname_temp.addr),
+ if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
reinterpret_cast<socklen_t*>(&sockname_temp.len)) < 0) {
goto error;
}
@@ -576,10 +576,9 @@ int grpc_udp_server_add_port(grpc_udp_server* s,
if (grpc_sockaddr_get_port(addr) == 0) {
for (size_t i = 0; i < s->listeners.size(); ++i) {
sockname_temp.len = sizeof(struct sockaddr_storage);
- if (0 ==
- getsockname(s->listeners[i].fd(),
- reinterpret_cast<struct sockaddr*>(sockname_temp.addr),
- reinterpret_cast<socklen_t*>(&sockname_temp.len))) {
+ if (0 == getsockname(s->listeners[i].fd(),
+ reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
+ reinterpret_cast<socklen_t*>(&sockname_temp.len))) {
port = grpc_sockaddr_get_port(&sockname_temp);
if (port > 0) {
allocated_addr = static_cast<grpc_resolved_address*>(
diff --git a/src/core/lib/iomgr/unix_sockets_posix.cc b/src/core/lib/iomgr/unix_sockets_posix.cc
index 8d252fd331..5d09b4a9b1 100644
--- a/src/core/lib/iomgr/unix_sockets_posix.cc
+++ b/src/core/lib/iomgr/unix_sockets_posix.cc
@@ -66,15 +66,15 @@ grpc_error* grpc_resolve_unix_domain_address(const char* name,
}
int grpc_is_unix_socket(const grpc_resolved_address* resolved_addr) {
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
return addr->sa_family == AF_UNIX;
}
void grpc_unlink_if_unix_domain_socket(
const grpc_resolved_address* resolved_addr) {
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
if (addr->sa_family != AF_UNIX) {
return;
}
@@ -89,8 +89,8 @@ void grpc_unlink_if_unix_domain_socket(
char* grpc_sockaddr_to_uri_unix_if_possible(
const grpc_resolved_address* resolved_addr) {
- const struct sockaddr* addr =
- reinterpret_cast<const struct sockaddr*>(resolved_addr->addr);
+ const grpc_sockaddr* addr =
+ reinterpret_cast<const grpc_sockaddr*>(resolved_addr->addr);
if (addr->sa_family != AF_UNIX) {
return nullptr;
}
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 994443c651..d96cbec292 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -97,6 +97,8 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/gethostname_sysconf.cc',
'src/core/lib/iomgr/iocp_windows.cc',
'src/core/lib/iomgr/iomgr.cc',
+ 'src/core/lib/iomgr/iomgr_custom.cc',
+ 'src/core/lib/iomgr/iomgr_internal.cc',
'src/core/lib/iomgr/iomgr_posix.cc',
'src/core/lib/iomgr/iomgr_uv.cc',
'src/core/lib/iomgr/iomgr_windows.cc',
@@ -105,12 +107,16 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/lockfree_event.cc',
'src/core/lib/iomgr/network_status_tracker.cc',
'src/core/lib/iomgr/polling_entity.cc',
- 'src/core/lib/iomgr/pollset_set_uv.cc',
+ 'src/core/lib/iomgr/pollset.cc',
+ 'src/core/lib/iomgr/pollset_custom.cc',
+ 'src/core/lib/iomgr/pollset_set.cc',
+ 'src/core/lib/iomgr/pollset_set_custom.cc',
'src/core/lib/iomgr/pollset_set_windows.cc',
'src/core/lib/iomgr/pollset_uv.cc',
'src/core/lib/iomgr/pollset_windows.cc',
+ 'src/core/lib/iomgr/resolve_address.cc',
+ 'src/core/lib/iomgr/resolve_address_custom.cc',
'src/core/lib/iomgr/resolve_address_posix.cc',
- 'src/core/lib/iomgr/resolve_address_uv.cc',
'src/core/lib/iomgr/resolve_address_windows.cc',
'src/core/lib/iomgr/resource_quota.cc',
'src/core/lib/iomgr/sockaddr_utils.cc',
@@ -122,19 +128,24 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/socket_utils_uv.cc',
'src/core/lib/iomgr/socket_utils_windows.cc',
'src/core/lib/iomgr/socket_windows.cc',
+ 'src/core/lib/iomgr/tcp_client.cc',
+ 'src/core/lib/iomgr/tcp_client_custom.cc',
'src/core/lib/iomgr/tcp_client_posix.cc',
- 'src/core/lib/iomgr/tcp_client_uv.cc',
'src/core/lib/iomgr/tcp_client_windows.cc',
+ 'src/core/lib/iomgr/tcp_custom.cc',
'src/core/lib/iomgr/tcp_posix.cc',
+ 'src/core/lib/iomgr/tcp_server.cc',
+ 'src/core/lib/iomgr/tcp_server_custom.cc',
'src/core/lib/iomgr/tcp_server_posix.cc',
'src/core/lib/iomgr/tcp_server_utils_posix_common.cc',
'src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.cc',
'src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.cc',
- 'src/core/lib/iomgr/tcp_server_uv.cc',
'src/core/lib/iomgr/tcp_server_windows.cc',
'src/core/lib/iomgr/tcp_uv.cc',
'src/core/lib/iomgr/tcp_windows.cc',
'src/core/lib/iomgr/time_averaged_stats.cc',
+ 'src/core/lib/iomgr/timer.cc',
+ 'src/core/lib/iomgr/timer_custom.cc',
'src/core/lib/iomgr/timer_generic.cc',
'src/core/lib/iomgr/timer_heap.cc',
'src/core/lib/iomgr/timer_manager.cc',