diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.c')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 92 |
1 files changed, 37 insertions, 55 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 880af93ee1..c48f4e83cf 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -46,9 +46,9 @@ #include <sys/types.h> #include <unistd.h> +#include <grpc/slice.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/slice.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> @@ -56,6 +56,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" #ifdef GRPC_HAVE_MSG_NOSIGNAL @@ -83,10 +84,10 @@ typedef struct { gpr_atm shutdown_count; /* garbage after the last read */ - gpr_slice_buffer last_read_buffer; + grpc_slice_buffer last_read_buffer; - gpr_slice_buffer *incoming_buffer; - gpr_slice_buffer *outgoing_buffer; + grpc_slice_buffer *incoming_buffer; + grpc_slice_buffer *outgoing_buffer; /** slice within outgoing_buffer to write next */ size_t outgoing_slice_idx; /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ @@ -102,7 +103,7 @@ typedef struct { char *peer_string; - grpc_resource_user resource_user; + grpc_resource_user *resource_user; grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; @@ -110,28 +111,18 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); -static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - grpc_error *error); - -static void tcp_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx, - grpc_tcp *tcp) { - if (gpr_atm_full_fetch_add(&tcp->shutdown_count, 1) == 0) { - grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user, - grpc_closure_create(tcp_unref_closure, tcp)); - } -} static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_maybe_shutdown_resource_user(exec_ctx, tcp); grpc_fd_shutdown(exec_ctx, tcp->em_fd); + grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); } static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, "tcp_unref_orphan"); - gpr_slice_buffer_destroy(&tcp->last_read_buffer); - grpc_resource_user_destroy(exec_ctx, &tcp->resource_user); + grpc_slice_buffer_destroy(&tcp->last_read_buffer); + grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -168,16 +159,10 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif -static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - TCP_UNREF(exec_ctx, arg, "resource_user"); -} - static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_maybe_shutdown_resource_user(exec_ctx, tcp); - gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } @@ -191,8 +176,8 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, gpr_log(GPR_DEBUG, "read: error=%s", str); grpc_error_free_string(str); for (i = 0; i < tcp->incoming_buffer->count; i++) { - char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i], - GPR_DUMP_HEX | GPR_DUMP_ASCII); + char *dump = grpc_dump_slice(tcp->incoming_buffer->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump); gpr_free(dump); } @@ -216,8 +201,8 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GPR_TIMER_BEGIN("tcp_continue_read", 0); for (i = 0; i < tcp->incoming_buffer->count; i++) { - iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); - iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); + iov[i].iov_base = GRPC_SLICE_START_PTR(tcp->incoming_buffer->slices[i]); + iov[i].iov_len = GRPC_SLICE_LENGTH(tcp->incoming_buffer->slices[i]); } msg.msg_name = NULL; @@ -244,19 +229,19 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { /* We've consumed the edge, request a new one */ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg")); TCP_UNREF(exec_ctx, tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed")); TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { - gpr_slice_buffer_trim_end( + grpc_slice_buffer_trim_end( tcp->incoming_buffer, tcp->incoming_buffer->length - (size_t)read_bytes, &tcp->last_read_buffer); @@ -275,8 +260,8 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_tcp *tcp = tcpp; if (error != GRPC_ERROR_NONE) { - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); TCP_UNREF(exec_ctx, tcp, "read"); } else { @@ -301,8 +286,8 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, GPR_ASSERT(!tcp->finished_edge); if (error != GRPC_ERROR_NONE) { - gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); TCP_UNREF(exec_ctx, tcp, "read"); } else { @@ -311,13 +296,13 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, } static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *incoming_buffer, grpc_closure *cb) { + grpc_slice_buffer *incoming_buffer, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->incoming_buffer = incoming_buffer; - gpr_slice_buffer_reset_and_unref(incoming_buffer); - gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(incoming_buffer); + grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = false; @@ -347,11 +332,11 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = - GPR_SLICE_START_PTR( + GRPC_SLICE_START_PTR( tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + tcp->outgoing_byte_idx; iov[iov_size].iov_len = - GPR_SLICE_LENGTH( + GRPC_SLICE_LENGTH( tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - tcp->outgoing_byte_idx; sending_length += iov[iov_size].iov_len; @@ -392,7 +377,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { size_t slice_length; tcp->outgoing_slice_idx--; - slice_length = GPR_SLICE_LENGTH( + slice_length = GRPC_SLICE_LENGTH( tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); if (slice_length > trailing) { tcp->outgoing_byte_idx = slice_length - trailing; @@ -442,7 +427,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, } static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, - gpr_slice_buffer *buf, grpc_closure *cb) { + grpc_slice_buffer *buf, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_error *error = GRPC_ERROR_NONE; @@ -451,7 +436,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (i = 0; i < buf->count; i++) { char *data = - gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); + grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); gpr_free(data); } @@ -515,7 +500,7 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - return &tcp->resource_user; + return tcp->resource_user; } static const grpc_endpoint_vtable vtable = {tcp_read, @@ -543,20 +528,18 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, tcp->slice_size = slice_size; tcp->iov_size = 1; tcp->finished_edge = true; - /* paired with unref in grpc_tcp_destroy, and with the shutdown for our - * resource_user */ - gpr_ref_init(&tcp->refcount, 2); + /* paired with unref in grpc_tcp_destroy */ + gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; tcp->read_closure.cb = tcp_handle_read; tcp->read_closure.cb_arg = tcp; tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; - gpr_slice_buffer_init(&tcp->last_read_buffer); - grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); - grpc_resource_user_slice_allocator_init(&tcp->slice_allocator, - &tcp->resource_user, - tcp_read_allocation_done, tcp); + grpc_slice_buffer_init(&tcp->last_read_buffer); + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); + grpc_resource_user_slice_allocator_init( + &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); @@ -576,8 +559,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; - tcp_maybe_shutdown_resource_user(exec_ctx, tcp); - gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer); + grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } |