diff options
author | Michael Darakananda <pongad@google.com> | 2017-10-10 15:07:28 +1100 |
---|---|---|
committer | Michael Darakananda <pongad@google.com> | 2017-10-10 15:07:28 +1100 |
commit | df6e3750b24a584daa823e01b2a69ac65f59af83 (patch) | |
tree | 659cbc6c0021d03564b516110abd4b09bd17ce82 /src/core/lib/iomgr/tcp_uv.cc | |
parent | b268629245788bee994e96cb585a3ab2c78bafa8 (diff) | |
parent | b6142ef1a6dc70565f468346c7935f43dbcb55fa (diff) |
Merge branch 'master' into fix-php
Diffstat (limited to 'src/core/lib/iomgr/tcp_uv.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.cc | 381 |
1 files changed, 381 insertions, 0 deletions
diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc new file mode 100644 index 0000000000..e311964dbc --- /dev/null +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -0,0 +1,381 @@ +/* + * + * Copyright 2016 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV + +#include <limits.h> +#include <string.h> + +#include <grpc/slice_buffer.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_uv.h" +#include "src/core/lib/iomgr/network_status_tracker.h" +#include "src/core/lib/iomgr/resource_quota.h" +#include "src/core/lib/iomgr/tcp_uv.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/string.h" + +grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); + +typedef struct { + grpc_endpoint base; + gpr_refcount refcount; + + uv_write_t write_req; + uv_shutdown_t shutdown_req; + + uv_tcp_t *handle; + + grpc_closure *read_cb; + grpc_closure *write_cb; + + grpc_slice read_slice; + grpc_slice_buffer *read_slices; + grpc_slice_buffer *write_slices; + uv_buf_t *write_buffers; + + grpc_resource_user *resource_user; + + bool shutting_down; + + char *peer_string; + grpc_pollset *pollset; +} grpc_tcp; + +static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + grpc_slice_unref_internal(exec_ctx, tcp->read_slice); + grpc_resource_user_unref(exec_ctx, tcp->resource_user); + gpr_free(tcp->handle); + gpr_free(tcp->peer_string); + gpr_free(tcp); +} + +#ifndef NDEBUG +#define TCP_UNREF(exec_ctx, tcp, reason) \ + tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) +#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, + const char *reason, const char *file, int line) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, + val - 1); + } + if (gpr_unref(&tcp->refcount)) { + tcp_free(exec_ctx, tcp); + } +} + +static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, + "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, + val + 1); + } + gpr_ref(&tcp->refcount); +} +#else +#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp)) +#define TCP_REF(tcp, reason) tcp_ref((tcp)) +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (gpr_unref(&tcp->refcount)) { + tcp_free(exec_ctx, tcp); + } +} + +static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +#endif + +static void uv_close_callback(uv_handle_t *handle) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp *tcp = (grpc_tcp *)handle->data; + TCP_UNREF(&exec_ctx, tcp, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); +} + +static grpc_slice alloc_read_slice(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { + return grpc_resource_user_slice_malloc(exec_ctx, resource_user, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE); +} + +static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, + uv_buf_t *buf) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp *tcp = (grpc_tcp *)handle->data; + (void)suggested_size; + buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); + buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void read_callback(uv_stream_t *stream, ssize_t nread, + const uv_buf_t *buf) { + grpc_slice sub; + grpc_error *error; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp *tcp = (grpc_tcp *)stream->data; + grpc_closure *cb = tcp->read_cb; + if (nread == 0) { + // Nothing happened. Wait for the next callback + return; + } + TCP_UNREF(&exec_ctx, tcp, "read"); + tcp->read_cb = NULL; + // TODO(murgatroid99): figure out what the return value here means + uv_read_stop(stream); + if (nread == UV_EOF) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"); + } else if (nread > 0) { + // Successful read + sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); + grpc_slice_buffer_add(tcp->read_slices, sub); + tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); + error = GRPC_ERROR_NONE; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + size_t i; + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + + for (i = 0; i < tcp->read_slices->count; i++) { + char *dump = grpc_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, + dump); + gpr_free(dump); + } + } + } else { + // nread < 0: Error + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"); + } + GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_slice_buffer *read_slices, grpc_closure *cb) { + grpc_tcp *tcp = (grpc_tcp *)ep; + int status; + grpc_error *error = GRPC_ERROR_NONE; + GRPC_UV_ASSERT_SAME_THREAD(); + GPR_ASSERT(tcp->read_cb == NULL); + tcp->read_cb = cb; + tcp->read_slices = read_slices; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices); + TCP_REF(tcp, "read"); + // TODO(murgatroid99): figure out what the return value here means + status = + uv_read_start((uv_stream_t *)tcp->handle, alloc_uv_buf, read_callback); + if (status != 0) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed at start"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + grpc_slice_from_static_string(uv_strerror(status))); + GRPC_CLOSURE_SCHED(exec_ctx, cb, error); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str); + } +} + +static void write_callback(uv_write_t *req, int status) { + grpc_tcp *tcp = (grpc_tcp *)req->data; + grpc_error *error; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_closure *cb = tcp->write_cb; + tcp->write_cb = NULL; + TCP_UNREF(&exec_ctx, tcp, "write"); + if (status == 0) { + error = GRPC_ERROR_NONE; + } else { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Write failed"); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); + } + gpr_free(tcp->write_buffers); + grpc_resource_user_free(&exec_ctx, tcp->resource_user, + sizeof(uv_buf_t) * tcp->write_slices->count); + GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_slice_buffer *write_slices, + grpc_closure *cb) { + grpc_tcp *tcp = (grpc_tcp *)ep; + uv_buf_t *buffers; + unsigned int buffer_count; + unsigned int i; + grpc_slice *slice; + uv_write_t *write_req; + GRPC_UV_ASSERT_SAME_THREAD(); + + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + size_t j; + + for (j = 0; j < write_slices->count; j++) { + char *data = grpc_dump_slice(write_slices->slices[j], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); + gpr_free(data); + } + } + + if (tcp->shutting_down) { + GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "TCP socket is shutting down")); + return; + } + + GPR_ASSERT(tcp->write_cb == NULL); + tcp->write_slices = write_slices; + GPR_ASSERT(tcp->write_slices->count <= UINT_MAX); + if (tcp->write_slices->count == 0) { + // No slices means we don't have to do anything, + // and libuv doesn't like empty writes + GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE); + return; + } + + tcp->write_cb = cb; + buffer_count = (unsigned int)tcp->write_slices->count; + buffers = (uv_buf_t *)gpr_malloc(sizeof(uv_buf_t) * buffer_count); + grpc_resource_user_alloc(exec_ctx, tcp->resource_user, + sizeof(uv_buf_t) * buffer_count, NULL); + for (i = 0; i < buffer_count; i++) { + slice = &tcp->write_slices->slices[i]; + buffers[i].base = (char *)GRPC_SLICE_START_PTR(*slice); + buffers[i].len = GRPC_SLICE_LENGTH(*slice); + } + tcp->write_buffers = buffers; + write_req = &tcp->write_req; + write_req->data = tcp; + TCP_REF(tcp, "write"); + // TODO(murgatroid99): figure out what the return value here means + uv_write(write_req, (uv_stream_t *)tcp->handle, buffers, buffer_count, + write_callback); +} + +static void uv_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset *pollset) { + // No-op. We're ignoring pollsets currently + (void)exec_ctx; + (void)ep; + (void)pollset; + grpc_tcp *tcp = (grpc_tcp *)ep; + tcp->pollset = pollset; +} + +static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset_set *pollset) { + // No-op. We're ignoring pollsets currently + (void)exec_ctx; + (void)ep; + (void)pollset; +} + +static void shutdown_callback(uv_shutdown_t *req, int status) {} + +static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_error *why) { + grpc_tcp *tcp = (grpc_tcp *)ep; + if (!tcp->shutting_down) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + const char *str = grpc_error_string(why); + gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str); + } + tcp->shutting_down = true; + uv_shutdown_t *req = &tcp->shutdown_req; + uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); + grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); + } + GRPC_ERROR_UNREF(why); +} + +static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + grpc_network_status_unregister_endpoint(ep); + grpc_tcp *tcp = (grpc_tcp *)ep; + uv_close((uv_handle_t *)tcp->handle, uv_close_callback); +} + +static char *uv_get_peer(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return gpr_strdup(tcp->peer_string); +} + +static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return tcp->resource_user; +} + +static int uv_get_fd(grpc_endpoint *ep) { return -1; } + +static grpc_endpoint_vtable vtable = { + uv_endpoint_read, uv_endpoint_write, uv_add_to_pollset, + uv_add_to_pollset_set, uv_endpoint_shutdown, uv_destroy, + uv_get_resource_user, uv_get_peer, uv_get_fd}; + +grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, + grpc_resource_quota *resource_quota, + char *peer_string) { + grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp); + } + + /* Disable Nagle's Algorithm */ + uv_tcp_nodelay(handle, 1); + + memset(tcp, 0, sizeof(grpc_tcp)); + tcp->base.vtable = &vtable; + tcp->handle = handle; + handle->data = tcp; + gpr_ref_init(&tcp->refcount, 1); + tcp->peer_string = gpr_strdup(peer_string); + tcp->shutting_down = false; + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); + tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); + /* Tell network status tracking code about the new endpoint */ + grpc_network_status_register_endpoint(&tcp->base); + +#ifndef GRPC_UV_TCP_HOLD_LOOP + uv_unref((uv_handle_t *)handle); +#endif + + grpc_exec_ctx_finish(&exec_ctx); + return &tcp->base; +} + +#endif /* GRPC_UV */ |