aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.c')
-rw-r--r--src/core/lib/iomgr/tcp_posix.c134
1 files changed, 98 insertions, 36 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 18c63cfbae..540305e4fa 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -46,9 +46,9 @@
#include <sys/types.h>
#include <unistd.h>
+#include <grpc/slice.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/slice.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@@ -56,6 +56,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
#ifdef GRPC_HAVE_MSG_NOSIGNAL
@@ -80,12 +81,13 @@ typedef struct {
msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
size_t slice_size;
gpr_refcount refcount;
+ gpr_atm shutdown_count;
/* garbage after the last read */
- gpr_slice_buffer last_read_buffer;
+ grpc_slice_buffer last_read_buffer;
- gpr_slice_buffer *incoming_buffer;
- gpr_slice_buffer *outgoing_buffer;
+ grpc_slice_buffer *incoming_buffer;
+ grpc_slice_buffer *outgoing_buffer;
/** slice within outgoing_buffer to write next */
size_t outgoing_slice_idx;
/** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
@@ -100,8 +102,17 @@ typedef struct {
grpc_closure write_closure;
char *peer_string;
+
+ grpc_resource_user *resource_user;
+ grpc_resource_user_slice_allocator slice_allocator;
} grpc_tcp;
+static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) {
+ return grpc_error_set_str(
+ grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd),
+ GRPC_ERROR_STR_TARGET_ADDRESS, tcp->peer_string);
+}
+
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
@@ -110,12 +121,14 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_fd_shutdown(exec_ctx, tcp->em_fd);
+ grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
}
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
- gpr_slice_buffer_destroy(&tcp->last_read_buffer);
+ grpc_slice_buffer_destroy(&tcp->last_read_buffer);
+ grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
@@ -155,6 +168,7 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
+ grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
@@ -168,8 +182,8 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
gpr_log(GPR_DEBUG, "read: error=%s", str);
grpc_error_free_string(str);
for (i = 0; i < tcp->incoming_buffer->count; i++) {
- char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
- GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ char *dump = grpc_dump_slice(tcp->incoming_buffer->slices[i],
+ GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
gpr_free(dump);
}
@@ -181,7 +195,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
}
#define MAX_READ_IOVEC 4
-static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
@@ -192,13 +206,9 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
GPR_TIMER_BEGIN("tcp_continue_read", 0);
- while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
- gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
- gpr_slice_malloc(tcp->slice_size));
- }
for (i = 0; i < tcp->incoming_buffer->count; i++) {
- iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
- iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
+ iov[i].iov_base = GRPC_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
+ iov[i].iov_len = GRPC_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
}
msg.msg_name = NULL;
@@ -225,19 +235,21 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
+ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ call_read_cb(exec_ctx, tcp,
+ tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF"));
+ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ call_read_cb(exec_ctx, tcp,
+ tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
- gpr_slice_buffer_trim_end(
+ grpc_slice_buffer_trim_end(
tcp->incoming_buffer,
tcp->incoming_buffer->length - (size_t)read_bytes,
&tcp->last_read_buffer);
@@ -252,13 +264,38 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
GPR_TIMER_END("tcp_continue_read", 0);
}
+static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
+ grpc_error *error) {
+ grpc_tcp *tcp = tcpp;
+ if (error != GRPC_ERROR_NONE) {
+ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
+ TCP_UNREF(exec_ctx, tcp, "read");
+ } else {
+ tcp_do_read(exec_ctx, tcp);
+ }
+}
+
+static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
+ if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
+ grpc_resource_user_alloc_slices(
+ exec_ctx, &tcp->slice_allocator, tcp->slice_size,
+ (size_t)tcp->iov_size - tcp->incoming_buffer->count,
+ tcp->incoming_buffer);
+ } else {
+ tcp_do_read(exec_ctx, tcp);
+ }
+}
+
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
if (error != GRPC_ERROR_NONE) {
- gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
@@ -267,13 +304,13 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
}
static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
+ grpc_slice_buffer *incoming_buffer, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->incoming_buffer = incoming_buffer;
- gpr_slice_buffer_reset_and_unref(incoming_buffer);
- gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref(incoming_buffer);
+ grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = false;
@@ -303,11 +340,11 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
iov_size != MAX_WRITE_IOVEC;
iov_size++) {
iov[iov_size].iov_base =
- GPR_SLICE_START_PTR(
+ GRPC_SLICE_START_PTR(
tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
tcp->outgoing_byte_idx;
iov[iov_size].iov_len =
- GPR_SLICE_LENGTH(
+ GRPC_SLICE_LENGTH(
tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
tcp->outgoing_byte_idx;
sending_length += iov[iov_size].iov_len;
@@ -336,8 +373,13 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
return false;
+ } else if (errno == EPIPE) {
+ *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"),
+ GRPC_ERROR_INT_GRPC_STATUS,
+ GRPC_STATUS_UNAVAILABLE);
+ return true;
} else {
- *error = GRPC_OS_ERROR(errno, "sendmsg");
+ *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
return true;
}
}
@@ -348,7 +390,7 @@ static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
size_t slice_length;
tcp->outgoing_slice_idx--;
- slice_length = GPR_SLICE_LENGTH(
+ slice_length = GRPC_SLICE_LENGTH(
tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
if (slice_length > trailing) {
tcp->outgoing_byte_idx = slice_length - trailing;
@@ -398,7 +440,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
}
static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- gpr_slice_buffer *buf, grpc_closure *cb) {
+ grpc_slice_buffer *buf, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_error *error = GRPC_ERROR_NONE;
@@ -407,7 +449,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
for (i = 0; i < buf->count; i++) {
char *data =
- gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
+ grpc_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data);
}
@@ -418,9 +460,10 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd)
- ? GRPC_ERROR_CREATE("EOF")
- : GRPC_ERROR_NONE,
+ grpc_exec_ctx_sched(exec_ctx, cb,
+ grpc_fd_is_shutdown(tcp->em_fd)
+ ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp)
+ : GRPC_ERROR_NONE,
NULL);
return;
}
@@ -464,11 +507,21 @@ static char *tcp_get_peer(grpc_endpoint *ep) {
return gpr_strdup(tcp->peer_string);
}
+static int tcp_get_fd(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return tcp->fd;
+}
+
static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
return grpc_fd_get_workqueue(tcp->em_fd);
}
+static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) {
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ return tcp->resource_user;
+}
+
static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_write,
tcp_get_workqueue,
@@ -476,10 +529,13 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
tcp_add_to_pollset_set,
tcp_shutdown,
tcp_destroy,
- tcp_get_peer};
+ tcp_get_resource_user,
+ tcp_get_peer,
+ tcp_get_fd};
-grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
- const char *peer_string) {
+grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
+ grpc_resource_quota *resource_quota,
+ size_t slice_size, const char *peer_string) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
tcp->peer_string = gpr_strdup(peer_string);
@@ -494,12 +550,16 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->finished_edge = true;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
+ gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
tcp->read_closure.cb = tcp_handle_read;
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
- gpr_slice_buffer_init(&tcp->last_read_buffer);
+ grpc_slice_buffer_init(&tcp->last_read_buffer);
+ tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
+ grpc_resource_user_slice_allocator_init(
+ &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
@@ -514,10 +574,12 @@ int grpc_tcp_fd(grpc_endpoint *ep) {
void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int *fd, grpc_closure *done) {
+ grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;
tcp->release_fd_cb = done;
+ grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}