aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_uv.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_uv.cc')
-rw-r--r--src/core/lib/iomgr/tcp_uv.cc627
1 files changed, 313 insertions, 314 deletions
diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc
index 6db3217d6e..5e3166926b 100644
--- a/src/core/lib/iomgr/tcp_uv.cc
+++ b/src/core/lib/iomgr/tcp_uv.cc
@@ -21,7 +21,6 @@
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_UV
-
#include <limits.h>
#include <string.h>
@@ -33,393 +32,393 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/error.h"
-#include "src/core/lib/iomgr/iomgr_uv.h"
+#include "src/core/lib/iomgr/iomgr_custom.h"
#include "src/core/lib/iomgr/network_status_tracker.h"
+#include "src/core/lib/iomgr/resolve_address_custom.h"
#include "src/core/lib/iomgr/resource_quota.h"
-#include "src/core/lib/iomgr/tcp_uv.h"
+#include "src/core/lib/iomgr/tcp_custom.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
-grpc_core::TraceFlag grpc_tcp_trace(false, "tcp");
+#include <uv.h>
-typedef struct {
- grpc_endpoint base;
- gpr_refcount refcount;
+#define IGNORE_CONST(addr) ((grpc_sockaddr*)(uintptr_t)(addr))
+typedef struct uv_socket_t {
+ uv_connect_t connect_req;
uv_write_t write_req;
uv_shutdown_t shutdown_req;
-
uv_tcp_t* handle;
-
- grpc_closure* read_cb;
- grpc_closure* write_cb;
-
- grpc_slice_buffer* read_slices;
- grpc_slice_buffer* write_slices;
uv_buf_t* write_buffers;
- grpc_resource_user* resource_user;
- grpc_resource_user_slice_allocator slice_allocator;
-
- bool shutting_down;
+ char* read_buf;
+ size_t read_len;
- char* peer_string;
- grpc_pollset* pollset;
-} grpc_tcp;
+ bool pending_connection;
+ grpc_custom_socket* accept_socket;
+ grpc_error* accept_error;
-static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) {
- return grpc_error_set_str(
- grpc_error_set_int(
- src_error,
- /* All tcp errors are marked with UNAVAILABLE so that application may
- * choose to retry. */
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
- GRPC_ERROR_STR_TARGET_ADDRESS,
- grpc_slice_from_copied_string(tcp->peer_string));
-}
+ grpc_custom_connect_callback connect_cb;
+ grpc_custom_write_callback write_cb;
+ grpc_custom_read_callback read_cb;
+ grpc_custom_accept_callback accept_cb;
+ grpc_custom_close_callback close_cb;
-static void tcp_free(grpc_tcp* tcp) {
- grpc_resource_user_unref(tcp->resource_user);
- gpr_free(tcp->handle);
- gpr_free(tcp->peer_string);
- gpr_free(tcp);
-}
-
-#ifndef NDEBUG
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
-#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
- int line) {
- if (grpc_tcp_trace.enabled()) {
- gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
- val - 1);
- }
- if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
- }
-}
+} uv_socket_t;
-static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
- int line) {
- if (grpc_tcp_trace.enabled()) {
- gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
- gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
- "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val,
- val + 1);
- }
- gpr_ref(&tcp->refcount);
-}
-#else
-#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
-#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_tcp* tcp) {
- if (gpr_unref(&tcp->refcount)) {
- tcp_free(tcp);
+static grpc_error* tcp_error_create(const char* desc, int status) {
+ if (status == 0) {
+ return GRPC_ERROR_NONE;
}
+ grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(desc);
+ /* All tcp errors are marked with UNAVAILABLE so that application may
+ * choose to retry. */
+ error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ return grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
+ grpc_slice_from_static_string(uv_strerror(status)));
}
-static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
-#endif
-
-static void uv_close_callback(uv_handle_t* handle) {
- grpc_core::ExecCtx exec_ctx;
- grpc_tcp* tcp = (grpc_tcp*)handle->data;
- TCP_UNREF(tcp, "destroy");
+static void uv_socket_destroy(grpc_custom_socket* socket) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ gpr_free(uv_socket->handle);
+ gpr_free(uv_socket);
}
static void alloc_uv_buf(uv_handle_t* handle, size_t suggested_size,
uv_buf_t* buf) {
- grpc_core::ExecCtx exec_ctx;
- grpc_tcp* tcp = (grpc_tcp*)handle->data;
+ uv_socket_t* uv_socket =
+ (uv_socket_t*)((grpc_custom_socket*)handle->data)->impl;
(void)suggested_size;
- /* Before calling uv_read_start, we allocate a buffer with exactly one slice
- * to tcp->read_slices and wait for the callback indicating that the
- * allocation was successful. So slices[0] should always exist here */
- buf->base = (char*)GRPC_SLICE_START_PTR(tcp->read_slices->slices[0]);
- buf->len = GRPC_SLICE_LENGTH(tcp->read_slices->slices[0]);
-}
-
-static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
- grpc_closure* cb = tcp->read_cb;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg);
- size_t i;
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "read: error=%s", str);
-
- for (i = 0; i < tcp->read_slices->count; i++) {
- char* dump = grpc_dump_slice(tcp->read_slices->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
- gpr_free(dump);
- }
- }
- tcp->read_slices = NULL;
- tcp->read_cb = NULL;
- GRPC_CLOSURE_RUN(cb, error);
+ buf->base = uv_socket->read_buf;
+ buf->len = uv_socket->read_len;
}
-static void read_callback(uv_stream_t* stream, ssize_t nread,
- const uv_buf_t* buf) {
- grpc_error* error;
- grpc_core::ExecCtx exec_ctx;
- grpc_tcp* tcp = (grpc_tcp*)stream->data;
- grpc_slice_buffer garbage;
+static void uv_read_callback(uv_stream_t* stream, ssize_t nread,
+ const uv_buf_t* buf) {
+ grpc_error* error = GRPC_ERROR_NONE;
if (nread == 0) {
// Nothing happened. Wait for the next callback
return;
}
- TCP_UNREF(tcp, "read");
// TODO(murgatroid99): figure out what the return value here means
uv_read_stop(stream);
if (nread == UV_EOF) {
- error =
- tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp);
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
- } else if (nread > 0) {
- // Successful read
- error = GRPC_ERROR_NONE;
- if ((size_t)nread < tcp->read_slices->length) {
- /* TODO(murgatroid99): Instead of discarding the unused part of the read
- * buffer, reuse it as the next read buffer. */
- grpc_slice_buffer_init(&garbage);
- grpc_slice_buffer_trim_end(
- tcp->read_slices, tcp->read_slices->length - (size_t)nread, &garbage);
- grpc_slice_buffer_reset_and_unref_internal(&garbage);
- }
- } else {
- // nread < 0: Error
- error = tcp_annotate_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"), tcp);
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF");
+ } else if (nread < 0) {
+ error = tcp_error_create("TCP Read failed", nread);
}
- call_read_cb(tcp, error);
+ grpc_custom_socket* socket = (grpc_custom_socket*)stream->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->read_cb(socket, (size_t)nread, error);
}
-static void tcp_read_allocation_done(void* tcpp, grpc_error* error) {
- int status;
- grpc_tcp* tcp = (grpc_tcp*)tcpp;
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp,
- grpc_error_string(error));
- }
- if (error == GRPC_ERROR_NONE) {
- status =
- uv_read_start((uv_stream_t*)tcp->handle, alloc_uv_buf, read_callback);
- if (status != 0) {
- error = tcp_annotate_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"),
- tcp);
- error = grpc_error_set_str(
- error, GRPC_ERROR_STR_OS_ERROR,
- grpc_slice_from_static_string(uv_strerror(status)));
- }
- }
- if (error != GRPC_ERROR_NONE) {
- grpc_slice_buffer_reset_and_unref_internal(tcp->read_slices);
- call_read_cb(tcp, GRPC_ERROR_REF(error));
- TCP_UNREF(tcp, "read");
+static void uv_close_callback(uv_handle_t* handle) {
+ grpc_custom_socket* socket = (grpc_custom_socket*)handle->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ if (uv_socket->accept_socket) {
+ uv_socket->accept_cb(socket, uv_socket->accept_socket,
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("socket closed"));
}
- if (grpc_tcp_trace.enabled()) {
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str);
+ uv_socket->close_cb(socket);
+}
+
+static void uv_socket_read(grpc_custom_socket* socket, char* buffer,
+ size_t length, grpc_custom_read_callback read_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int status;
+ grpc_error* error;
+ uv_socket->read_cb = read_cb;
+ uv_socket->read_buf = buffer;
+ uv_socket->read_len = length;
+ // TODO(murgatroid99): figure out what the return value here means
+ status =
+ uv_read_start((uv_stream_t*)uv_socket->handle, (uv_alloc_cb)alloc_uv_buf,
+ (uv_read_cb)uv_read_callback);
+ if (status != 0) {
+ error = tcp_error_create("TCP Read failed at start", status);
+ uv_socket->read_cb(socket, 0, error);
}
}
-static void uv_endpoint_read(grpc_endpoint* ep, grpc_slice_buffer* read_slices,
- grpc_closure* cb) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- GRPC_UV_ASSERT_SAME_THREAD();
- GPR_ASSERT(tcp->read_cb == NULL);
- tcp->read_cb = cb;
- tcp->read_slices = read_slices;
- grpc_slice_buffer_reset_and_unref_internal(read_slices);
- TCP_REF(tcp, "read");
- grpc_resource_user_alloc_slices(&tcp->slice_allocator,
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
- tcp->read_slices);
+static void uv_write_callback(uv_write_t* req, int status) {
+ grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ gpr_free(uv_socket->write_buffers);
+ uv_socket->write_cb(socket, tcp_error_create("TCP Write failed", status));
}
-static void write_callback(uv_write_t* req, int status) {
- grpc_tcp* tcp = (grpc_tcp*)req->data;
- grpc_error* error;
- grpc_core::ExecCtx exec_ctx;
- grpc_closure* cb = tcp->write_cb;
- tcp->write_cb = NULL;
- TCP_UNREF(tcp, "write");
- if (status == 0) {
- error = GRPC_ERROR_NONE;
- } else {
- error = tcp_annotate_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Write failed"), tcp);
- }
- if (grpc_tcp_trace.enabled()) {
- const char* str = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str);
+void uv_socket_write(grpc_custom_socket* socket,
+ grpc_slice_buffer* write_slices,
+ grpc_custom_write_callback write_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->write_cb = write_cb;
+ uv_buf_t* uv_buffers;
+ uv_write_t* write_req;
+
+ uv_buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * write_slices->count);
+ for (size_t i = 0; i < write_slices->count; i++) {
+ uv_buffers[i].base = (char*)GRPC_SLICE_START_PTR(write_slices->slices[i]);
+ uv_buffers[i].len = GRPC_SLICE_LENGTH(write_slices->slices[i]);
}
- gpr_free(tcp->write_buffers);
- GRPC_CLOSURE_SCHED(cb, error);
+
+ uv_socket->write_buffers = uv_buffers;
+ write_req = &uv_socket->write_req;
+ write_req->data = socket;
+ // TODO(murgatroid99): figure out what the return value here means
+ uv_write(write_req, (uv_stream_t*)uv_socket->handle, uv_buffers,
+ write_slices->count, uv_write_callback);
}
-static void uv_endpoint_write(grpc_endpoint* ep,
- grpc_slice_buffer* write_slices,
- grpc_closure* cb) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- uv_buf_t* buffers;
- unsigned int buffer_count;
- unsigned int i;
- grpc_slice* slice;
- uv_write_t* write_req;
- GRPC_UV_ASSERT_SAME_THREAD();
+static void shutdown_callback(uv_shutdown_t* req, int status) {}
- if (grpc_tcp_trace.enabled()) {
- size_t j;
+static void uv_socket_shutdown(grpc_custom_socket* socket) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_shutdown_t* req = &uv_socket->shutdown_req;
+ uv_shutdown(req, (uv_stream_t*)uv_socket->handle, shutdown_callback);
+}
- for (j = 0; j < write_slices->count; j++) {
- char* data = grpc_dump_slice(write_slices->slices[j],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
- gpr_free(data);
- }
+static void uv_socket_close(grpc_custom_socket* socket,
+ grpc_custom_close_callback close_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->close_cb = close_cb;
+ uv_close((uv_handle_t*)uv_socket->handle, uv_close_callback);
+}
+
+static grpc_error* uv_socket_init_helper(uv_socket_t* uv_socket, int domain) {
+ uv_tcp_t* tcp = (uv_tcp_t*)gpr_malloc(sizeof(uv_tcp_t));
+ uv_socket->handle = tcp;
+ int status = uv_tcp_init_ex(uv_default_loop(), tcp, (unsigned int)domain);
+ if (status != 0) {
+ return tcp_error_create("Failed to initialize UV tcp handle", status);
}
+ uv_socket->write_buffers = nullptr;
+ uv_socket->read_len = 0;
+ uv_tcp_nodelay(uv_socket->handle, 1);
+ uv_socket->pending_connection = false;
+ uv_socket->accept_socket = nullptr;
+ uv_socket->accept_error = GRPC_ERROR_NONE;
+ return GRPC_ERROR_NONE;
+}
- if (tcp->shutting_down) {
- GRPC_CLOSURE_SCHED(cb,
- tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "TCP socket is shutting down"),
- tcp));
- return;
+static grpc_error* uv_socket_init(grpc_custom_socket* socket, int domain) {
+ uv_socket_t* uv_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
+ grpc_error* error = uv_socket_init_helper(uv_socket, domain);
+ if (error != GRPC_ERROR_NONE) {
+ return error;
}
+ uv_socket->handle->data = socket;
+ socket->impl = uv_socket;
+ return GRPC_ERROR_NONE;
+}
+
+static grpc_error* uv_socket_getpeername(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr,
+ int* addr_len) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int err = uv_tcp_getpeername(uv_socket->handle,
+ (struct sockaddr*)IGNORE_CONST(addr), addr_len);
+ return tcp_error_create("getpeername failed", err);
+}
+
+static grpc_error* uv_socket_getsockname(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr,
+ int* addr_len) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int err = uv_tcp_getsockname(uv_socket->handle,
+ (struct sockaddr*)IGNORE_CONST(addr), addr_len);
+ return tcp_error_create("getsockname failed", err);
+}
- GPR_ASSERT(tcp->write_cb == NULL);
- tcp->write_slices = write_slices;
- GPR_ASSERT(tcp->write_slices->count <= UINT_MAX);
- if (tcp->write_slices->count == 0) {
- // No slices means we don't have to do anything,
- // and libuv doesn't like empty writes
- GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
+static void accept_new_connection(grpc_custom_socket* socket) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ if (!uv_socket->pending_connection || !uv_socket->accept_socket) {
return;
}
+ grpc_custom_socket* new_socket = uv_socket->accept_socket;
+ grpc_error* error = uv_socket->accept_error;
+ uv_socket->accept_socket = nullptr;
+ uv_socket->accept_error = GRPC_ERROR_NONE;
+ uv_socket->pending_connection = false;
+ if (uv_socket->accept_error != GRPC_ERROR_NONE) {
+ uv_stream_t dummy_handle;
+ uv_accept((uv_stream_t*)uv_socket->handle, &dummy_handle);
+ uv_socket->accept_cb(socket, new_socket, error);
+ } else {
+ uv_socket_t* uv_new_socket = (uv_socket_t*)gpr_malloc(sizeof(uv_socket_t));
+ uv_socket_init_helper(uv_new_socket, AF_UNSPEC);
+ // UV documentation says this is guaranteed to succeed
+ GPR_ASSERT(uv_accept((uv_stream_t*)uv_socket->handle,
+ (uv_stream_t*)uv_new_socket->handle) == 0);
+ new_socket->impl = uv_new_socket;
+ uv_new_socket->handle->data = new_socket;
+ uv_socket->accept_cb(socket, new_socket, error);
+ }
+}
- tcp->write_cb = cb;
- buffer_count = (unsigned int)tcp->write_slices->count;
- buffers = (uv_buf_t*)gpr_malloc(sizeof(uv_buf_t) * buffer_count);
- for (i = 0; i < buffer_count; i++) {
- slice = &tcp->write_slices->slices[i];
- buffers[i].base = (char*)GRPC_SLICE_START_PTR(*slice);
- buffers[i].len = GRPC_SLICE_LENGTH(*slice);
+static void uv_on_connect(uv_stream_t* server, int status) {
+ grpc_custom_socket* socket = (grpc_custom_socket*)server->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ GPR_ASSERT(!uv_socket->pending_connection);
+ uv_socket->pending_connection = true;
+ if (status < 0) {
+ switch (status) {
+ case UV_EINTR:
+ case UV_EAGAIN:
+ return;
+ default:
+ uv_socket->accept_error = tcp_error_create("accept failed", status);
+ }
}
- tcp->write_buffers = buffers;
- write_req = &tcp->write_req;
- write_req->data = tcp;
- TCP_REF(tcp, "write");
- // TODO(murgatroid99): figure out what the return value here means
- uv_write(write_req, (uv_stream_t*)tcp->handle, buffers, buffer_count,
- write_callback);
+ accept_new_connection(socket);
}
-static void uv_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
- // No-op. We're ignoring pollsets currently
- (void)ep;
- (void)pollset;
- grpc_tcp* tcp = (grpc_tcp*)ep;
- tcp->pollset = pollset;
+void uv_socket_accept(grpc_custom_socket* socket,
+ grpc_custom_socket* new_socket,
+ grpc_custom_accept_callback accept_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->accept_cb = accept_cb;
+ GPR_ASSERT(uv_socket->accept_socket == nullptr);
+ uv_socket->accept_socket = new_socket;
+ accept_new_connection(socket);
}
-static void uv_add_to_pollset_set(grpc_endpoint* ep,
- grpc_pollset_set* pollset) {
- // No-op. We're ignoring pollsets currently
- (void)ep;
- (void)pollset;
+static grpc_error* uv_socket_bind(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr, size_t len,
+ int flags) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int status =
+ uv_tcp_bind((uv_tcp_t*)uv_socket->handle, (struct sockaddr*)addr, 0);
+ return tcp_error_create("Failed to bind to port", status);
}
-static void uv_delete_from_pollset_set(grpc_endpoint* ep,
- grpc_pollset_set* pollset) {
- // No-op. We're ignoring pollsets currently
- (void)ep;
- (void)pollset;
+static grpc_error* uv_socket_listen(grpc_custom_socket* socket) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ int status =
+ uv_listen((uv_stream_t*)uv_socket->handle, SOMAXCONN, uv_on_connect);
+ return tcp_error_create("Failed to listen to port", status);
}
-static void shutdown_callback(uv_shutdown_t* req, int status) {}
+static grpc_error* uv_socket_setsockopt(grpc_custom_socket* socket, int level,
+ int option_name, const void* optval,
+ socklen_t option_len) {
+ int fd;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_fileno((uv_handle_t*)uv_socket->handle, &fd);
+ // TODO Handle error here. Also, does this work on windows??
+ setsockopt(fd, level, option_name, &optval, (socklen_t)option_len);
+ return GRPC_ERROR_NONE;
+}
-static void uv_endpoint_shutdown(grpc_endpoint* ep, grpc_error* why) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- if (!tcp->shutting_down) {
- if (grpc_tcp_trace.enabled()) {
- const char* str = grpc_error_string(why);
- gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str);
- }
- tcp->shutting_down = true;
- uv_shutdown_t* req = &tcp->shutdown_req;
- uv_shutdown(req, (uv_stream_t*)tcp->handle, shutdown_callback);
- grpc_resource_user_shutdown(tcp->resource_user);
+static void uv_tc_on_connect(uv_connect_t* req, int status) {
+ grpc_custom_socket* socket = (grpc_custom_socket*)req->data;
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ grpc_error* error;
+ if (status == UV_ECANCELED) {
+ // This should only happen if the handle is already closed
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Timeout occurred");
+ } else {
+ error = tcp_error_create("Failed to connect to remote host", status);
}
- GRPC_ERROR_UNREF(why);
+ uv_socket->connect_cb(socket, error);
}
-static void uv_destroy(grpc_endpoint* ep) {
- grpc_network_status_unregister_endpoint(ep);
- grpc_tcp* tcp = (grpc_tcp*)ep;
- uv_close((uv_handle_t*)tcp->handle, uv_close_callback);
+static void uv_socket_connect(grpc_custom_socket* socket,
+ const grpc_sockaddr* addr, size_t len,
+ grpc_custom_connect_callback connect_cb) {
+ uv_socket_t* uv_socket = (uv_socket_t*)socket->impl;
+ uv_socket->connect_cb = connect_cb;
+ uv_socket->connect_req.data = socket;
+ int status = uv_tcp_connect(&uv_socket->connect_req, uv_socket->handle,
+ (struct sockaddr*)addr, uv_tc_on_connect);
+ if (status != 0) {
+ // The callback will not be called
+ uv_socket->connect_cb(socket, tcp_error_create("connect failed", status));
+ }
+}
+
+static grpc_resolved_addresses* handle_addrinfo_result(
+ struct addrinfo* result) {
+ struct addrinfo* resp;
+ struct addrinfo* prev;
+ size_t i;
+ grpc_resolved_addresses* addresses =
+ (grpc_resolved_addresses*)gpr_malloc(sizeof(grpc_resolved_addresses));
+ addresses->naddrs = 0;
+ for (resp = result; resp != nullptr; resp = resp->ai_next) {
+ addresses->naddrs++;
+ }
+ addresses->addrs = (grpc_resolved_address*)gpr_malloc(
+ sizeof(grpc_resolved_address) * addresses->naddrs);
+ i = 0;
+ resp = result;
+ while (resp != nullptr) {
+ memcpy(&addresses->addrs[i].addr, resp->ai_addr, resp->ai_addrlen);
+ addresses->addrs[i].len = resp->ai_addrlen;
+ i++;
+ prev = resp;
+ resp = resp->ai_next;
+ gpr_free(prev);
+ }
+ return addresses;
}
-static char* uv_get_peer(grpc_endpoint* ep) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- return gpr_strdup(tcp->peer_string);
+static void uv_resolve_callback(uv_getaddrinfo_t* req, int status,
+ struct addrinfo* res) {
+ grpc_custom_resolver* r = (grpc_custom_resolver*)req->data;
+ gpr_free(req);
+ grpc_resolved_addresses* result = nullptr;
+ if (status == 0) {
+ result = handle_addrinfo_result(res);
+ }
+ grpc_custom_resolve_callback(r, result,
+ tcp_error_create("getaddrinfo failed", status));
}
-static grpc_resource_user* uv_get_resource_user(grpc_endpoint* ep) {
- grpc_tcp* tcp = (grpc_tcp*)ep;
- return tcp->resource_user;
+static grpc_error* uv_resolve(char* host, char* port,
+ grpc_resolved_addresses** result) {
+ int status;
+ uv_getaddrinfo_t req;
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */
+ hints.ai_socktype = SOCK_STREAM; /* stream socket */
+ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */
+ status = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
+ if (status != 0) {
+ *result = nullptr;
+ } else {
+ *result = handle_addrinfo_result(req.addrinfo);
+ }
+ return tcp_error_create("getaddrinfo failed", status);
}
-static int uv_get_fd(grpc_endpoint* ep) { return -1; }
-
-static grpc_endpoint_vtable vtable = {uv_endpoint_read,
- uv_endpoint_write,
- uv_add_to_pollset,
- uv_add_to_pollset_set,
- uv_delete_from_pollset_set,
- uv_endpoint_shutdown,
- uv_destroy,
- uv_get_resource_user,
- uv_get_peer,
- uv_get_fd};
-
-grpc_endpoint* grpc_tcp_create(uv_tcp_t* handle,
- grpc_resource_quota* resource_quota,
- char* peer_string) {
- grpc_tcp* tcp = (grpc_tcp*)gpr_malloc(sizeof(grpc_tcp));
- grpc_core::ExecCtx exec_ctx;
-
- if (grpc_tcp_trace.enabled()) {
- gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp);
+static void uv_resolve_async(grpc_custom_resolver* r, char* host, char* port) {
+ int status;
+ uv_getaddrinfo_t* req =
+ (uv_getaddrinfo_t*)gpr_malloc(sizeof(uv_getaddrinfo_t));
+ req->data = r;
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = GRPC_AF_UNSPEC; /* ipv4 or ipv6 */
+ hints.ai_socktype = GRPC_SOCK_STREAM; /* stream socket */
+ hints.ai_flags = GRPC_AI_PASSIVE; /* for wildcard IP address */
+ status = uv_getaddrinfo(uv_default_loop(), req, uv_resolve_callback, host,
+ port, &hints);
+ if (status != 0) {
+ gpr_free(req);
+ grpc_error* error = tcp_error_create("getaddrinfo failed", status);
+ grpc_custom_resolve_callback(r, NULL, error);
}
+}
- /* Disable Nagle's Algorithm */
- uv_tcp_nodelay(handle, 1);
-
- memset(tcp, 0, sizeof(grpc_tcp));
- tcp->base.vtable = &vtable;
- tcp->handle = handle;
- handle->data = tcp;
- gpr_ref_init(&tcp->refcount, 1);
- tcp->peer_string = gpr_strdup(peer_string);
- tcp->shutting_down = false;
- tcp->read_slices = NULL;
- tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
- grpc_resource_user_slice_allocator_init(
- &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
- /* Tell network status tracking code about the new endpoint */
- grpc_network_status_register_endpoint(&tcp->base);
-
-#ifndef GRPC_UV_TCP_HOLD_LOOP
- uv_unref((uv_handle_t*)handle);
-#endif
+grpc_custom_resolver_vtable uv_resolver_vtable = {uv_resolve, uv_resolve_async};
- return &tcp->base;
-}
+grpc_socket_vtable grpc_uv_socket_vtable = {
+ uv_socket_init, uv_socket_connect, uv_socket_destroy,
+ uv_socket_shutdown, uv_socket_close, uv_socket_write,
+ uv_socket_read, uv_socket_getpeername, uv_socket_getsockname,
+ uv_socket_setsockopt, uv_socket_bind, uv_socket_listen,
+ uv_socket_accept};
-#endif /* GRPC_UV */
+#endif