aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.c')
-rw-r--r--src/core/lib/iomgr/tcp_posix.c82
1 files changed, 45 insertions, 37 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 7210aef5d5..2ab45e33ce 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -35,9 +35,11 @@
#ifdef GPR_POSIX_SOCKET
+#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/tcp_posix.h"
#include <errno.h>
+#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
@@ -74,7 +76,7 @@ typedef struct {
grpc_endpoint base;
grpc_fd *em_fd;
int fd;
- int finished_edge;
+ bool finished_edge;
msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
size_t slice_size;
gpr_refcount refcount;
@@ -101,9 +103,9 @@ typedef struct {
} grpc_tcp;
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success);
+ grpc_error *error);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success);
+ grpc_error *error);
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -151,27 +153,31 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+ grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
TCP_UNREF(exec_ctx, tcp, "destroy");
}
-static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
+static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
+ grpc_error *error) {
grpc_closure *cb = tcp->read_cb;
if (grpc_tcp_trace) {
size_t i;
- gpr_log(GPR_DEBUG, "read: success=%d", success);
+ const char *str = grpc_error_string(error);
+ gpr_log(GPR_DEBUG, "read: error=%s", str);
+ grpc_error_free_string(str);
for (i = 0; i < tcp->incoming_buffer->count; i++) {
char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
gpr_free(dump);
}
}
tcp->read_cb = NULL;
tcp->incoming_buffer = NULL;
- cb->cb(exec_ctx, cb->cb_arg, success);
+ grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
#define MAX_READ_IOVEC 4
@@ -219,15 +225,14 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- /* TODO(klempner): Log interesting errors */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, 0);
+ call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg"));
TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, 0);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF"));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
@@ -240,7 +245,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
++tcp->iov_size;
}
GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
- call_read_cb(exec_ctx, tcp, 1);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
TCP_UNREF(exec_ctx, tcp, "read");
}
@@ -248,13 +253,13 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
}
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success) {
+ grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp, 0);
+ call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
tcp_continue_read(exec_ctx, tcp);
@@ -271,17 +276,16 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
- tcp->finished_edge = 0;
+ tcp->finished_edge = false;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
}
}
-typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
-
+/* returns true if done, false if pending; if returning true, *error is set */
#define MAX_WRITE_IOVEC 16
-static flush_result tcp_flush(grpc_tcp *tcp) {
+static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
@@ -331,10 +335,10 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
if (errno == EAGAIN) {
tcp->outgoing_slice_idx = unwind_slice_idx;
tcp->outgoing_byte_idx = unwind_byte_idx;
- return FLUSH_PENDING;
+ return false;
} else {
- /* TODO(klempner): Log some of these */
- return FLUSH_ERROR;
+ *error = GRPC_OS_ERROR(errno, "sendmsg");
+ return true;
}
}
@@ -355,42 +359,42 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
}
if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
- return FLUSH_DONE;
+ *error = GRPC_ERROR_NONE;
+ return true;
}
};
}
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- bool success) {
+ grpc_error *error) {
grpc_tcp *tcp = (grpc_tcp *)arg;
- flush_result status;
grpc_closure *cb;
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb->cb(exec_ctx, cb->cb_arg, 0);
+ cb->cb(exec_ctx, cb->cb_arg, error);
TCP_UNREF(exec_ctx, tcp, "write");
return;
}
- status = tcp_flush(tcp);
- if (status == FLUSH_PENDING) {
+ if (!tcp_flush(tcp, &error)) {
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
cb = tcp->write_cb;
tcp->write_cb = NULL;
GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
- cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE);
+ cb->cb(exec_ctx, cb->cb_arg, error);
GPR_TIMER_END("tcp_handle_write.cb", 0);
TCP_UNREF(exec_ctx, tcp, "write");
+ GRPC_ERROR_UNREF(error);
}
}
static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *buf, grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- flush_result status;
+ grpc_error *error = GRPC_ERROR_NONE;
if (grpc_tcp_trace) {
size_t i;
@@ -398,7 +402,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
for (i = 0; i < buf->count; i++) {
char *data =
gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data);
}
}
@@ -408,20 +412,22 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd)
+ ? GRPC_ERROR_CREATE("EOF")
+ : GRPC_ERROR_NONE,
+ NULL);
return;
}
tcp->outgoing_buffer = buf;
tcp->outgoing_slice_idx = 0;
tcp->outgoing_byte_idx = 0;
- status = tcp_flush(tcp);
- if (status == FLUSH_PENDING) {
+ if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL);
+ grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
}
GPR_TIMER_END("tcp_write", 0);
@@ -461,7 +467,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->incoming_buffer = NULL;
tcp->slice_size = slice_size;
tcp->iov_size = 1;
- tcp->finished_edge = 1;
+ tcp->finished_edge = true;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd;
@@ -470,6 +476,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->write_closure.cb = tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
gpr_slice_buffer_init(&tcp->last_read_buffer);
+ /* Tell network status tracker about new endpoint */
+ grpc_network_status_register_endpoint(&tcp->base);
return &tcp->base;
}