aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.c')
-rw-r--r--src/core/lib/iomgr/tcp_posix.c92
1 files changed, 37 insertions, 55 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 880af93ee1..c48f4e83cf 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -46,9 +46,9 @@
#include <sys/types.h>
#include <unistd.h>
+#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@@ -56,6 +56,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#ifdef GRPC_HAVE_MSG_NOSIGNAL
@@ -83,10 +84,10 @@ typedef struct {
gpr_atm shutdown_count;
/* garbage after the last read */
- gpr_slice_buffer last_read_buffer;
+ grpc_slice_buffer last_read_buffer;
- gpr_slice_buffer *incoming_buffer;
- gpr_slice_buffer *outgoing_buffer;
+ grpc_slice_buffer *incoming_buffer;
+ grpc_slice_buffer *outgoing_buffer;
/** slice within outgoing_buffer to write next */
size_t outgoing_slice_idx;
/** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
@@ -102,7 +103,7 @@ typedef struct {
char *peer_string;
- grpc_resource_user resource_user;
+ grpc_resource_user *resource_user;
grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
@@ -110,28 +111,18 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
-static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- grpc_error *error);
-
-static void tcp_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx,
- grpc_tcp *tcp) {
- if (gpr_atm_full_fetch_add(&tcp->shutdown_count, 1) == 0) {
- grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user,
- grpc_closure_create(tcp_unref_closure, tcp));
- }
-}
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
grpc_fd_shutdown(exec_ctx, tcp->em_fd);
+ grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
}
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
- gpr_slice_buffer_destroy(&tcp->last_read_buffer);
- grpc_resource_user_destroy(exec_ctx, &tcp->resource_user);
+ grpc_slice_buffer_destroy(&tcp->last_read_buffer);
+ grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
@@ -168,16 +159,10 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
-static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_error *error) {
- TCP_UNREF(exec_ctx, arg, "resource_user");
-}
-
static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
- tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
- gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
@@ -191,8 +176,8 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
gpr_log(GPR_DEBUG, "read: error=%s", str);
grpc_error_free_string(str);
for (i = 0; i < tcp->incoming_buffer->count; i++) {
- char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ char *dump = grpc_dump_slice(tcp->incoming_buffer->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
gpr_free(dump);
}
@@ -216,8 +201,8 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GPR_TIMER_BEGIN("tcp_continue_read", 0);
for (i = 0; i < tcp->incoming_buffer->count; i++) {
- iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
- iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
+ iov[i].iov_base = GRPC_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
+ iov[i].iov_len = GRPC_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
}
msg.msg_name = NULL;
@@ -244,19 +229,19 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("Socket closed"));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
- gpr_slice_buffer_trim_end(
+ grpc_slice_buffer_trim_end(
tcp->incoming_buffer,
tcp->incoming_buffer->length - (size_t)read_bytes,
&tcp->last_read_buffer);
@@ -275,8 +260,8 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
grpc_error *error) {
grpc_tcp *tcp = tcpp;
if (error != GRPC_ERROR_NONE) {
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
@@ -301,8 +286,8 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
GPR_ASSERT(!tcp->finished_edge);
if (error != GRPC_ERROR_NONE) {
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
@@ -311,13 +296,13 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
}
static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
+ grpc_slice_buffer *incoming_buffer, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->incoming_buffer = incoming_buffer;
- gpr_slice_buffer_reset_and_unref(incoming_buffer);
- gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref(incoming_buffer);
+ grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = false;
@@ -347,11 +332,11 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
iov_size != MAX_WRITE_IOVEC;
iov_size++) {
iov[iov_size].iov_base =
- GPR_SLICE_START_PTR(
+ GRPC_SLICE_START_PTR(
tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
tcp->outgoing_byte_idx;
iov[iov_size].iov_len =
- GPR_SLICE_LENGTH(
+ GRPC_SLICE_LENGTH(
tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
tcp->outgoing_byte_idx;
sending_length += iov[iov_size].iov_len;
@@ -392,7 +377,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
size_t slice_length;
tcp->outgoing_slice_idx--;
- slice_length = GPR_SLICE_LENGTH(
+ slice_length = GRPC_SLICE_LENGTH(
tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
if (slice_length > trailing) {
tcp->outgoing_byte_idx = slice_length - trailing;
@@ -442,7 +427,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
}
static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- gpr_slice_buffer *buf, grpc_closure *cb) {
+ grpc_slice_buffer *buf, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_error *error = GRPC_ERROR_NONE;
@@ -451,7 +436,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
for (i = 0; i < buf->count; i++) {
char *data =
- gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data);
}
@@ -515,7 +500,7 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- return &tcp->resource_user;
+ return tcp->resource_user;
}
static const grpc_endpoint_vtable vtable = {tcp_read,
@@ -543,20 +528,18 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = true;
- /* paired with unref in grpc_tcp_destroy, and with the shutdown for our
- * resource_user */
- gpr_ref_init(&tcp->refcount, 2);
+ /* paired with unref in grpc_tcp_destroy */
+ gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
tcp->read_closure.cb = tcp_handle_read;
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
- gpr_slice_buffer_init(&tcp->last_read_buffer);
- grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string);
- grpc_resource_user_slice_allocator_init(&tcp->slice_allocator,
- &tcp->resource_user,
- tcp_read_allocation_done, tcp);
+ grpc_slice_buffer_init(&tcp->last_read_buffer);
+ tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
+ grpc_resource_user_slice_allocator_init(
+ &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
@@ -576,8 +559,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;
tcp->release_fd_cb = done;
- tcp_maybe_shutdown_resource_user(exec_ctx, tcp);
- gpr_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}