diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.c')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 78 |
1 files changed, 41 insertions, 37 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index e2869224f1..017f52c367 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -38,6 +38,7 @@ #include "src/core/lib/iomgr/tcp_posix.h" #include <errno.h> +#include <stdbool.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> @@ -74,7 +75,7 @@ typedef struct { grpc_endpoint base; grpc_fd *em_fd; int fd; - int finished_edge; + bool finished_edge; msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ size_t slice_size; gpr_refcount refcount; @@ -101,9 +102,9 @@ typedef struct { } grpc_tcp; static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success); + grpc_error *error); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success); + grpc_error *error); static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -155,12 +156,15 @@ static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { TCP_UNREF(exec_ctx, tcp, "destroy"); } -static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) { +static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, + grpc_error *error) { grpc_closure *cb = tcp->read_cb; - if (grpc_tcp_trace) { + if (false && grpc_tcp_trace) { size_t i; - gpr_log(GPR_DEBUG, "read: success=%d", success); + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + grpc_error_free_string(str); for (i = 0; i < tcp->incoming_buffer->count; i++) { char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); @@ -171,7 +175,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) { tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - cb->cb(exec_ctx, cb->cb_arg, success); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } #define MAX_READ_IOVEC 4 @@ -219,15 +223,14 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { /* We've consumed the edge, request a new one */ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - /* TODO(klempner): Log interesting errors */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, 0); + call_read_cb(exec_ctx, tcp, GRPC_OS_ERROR(errno, "recvmsg")); TCP_UNREF(exec_ctx, tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, 0); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_CREATE("EOF")); TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); @@ -240,7 +243,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); - call_read_cb(exec_ctx, tcp, 1); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE); TCP_UNREF(exec_ctx, tcp, "read"); } @@ -248,13 +251,13 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { } static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success) { + grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); - if (!success) { + if (error != GRPC_ERROR_NONE) { gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, 0); + call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); TCP_UNREF(exec_ctx, tcp, "read"); } else { tcp_continue_read(exec_ctx, tcp); @@ -271,17 +274,16 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); if (tcp->finished_edge) { - tcp->finished_edge = 0; + tcp->finished_edge = false; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL); + grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); } } -typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result; - +/* returns true if done, false if pending; if returning true, *error is set */ #define MAX_WRITE_IOVEC 16 -static flush_result tcp_flush(grpc_tcp *tcp) { +static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -331,10 +333,10 @@ static flush_result tcp_flush(grpc_tcp *tcp) { if (errno == EAGAIN) { tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; - return FLUSH_PENDING; + return false; } else { - /* TODO(klempner): Log some of these */ - return FLUSH_ERROR; + *error = GRPC_OS_ERROR(errno, "sendmsg"); + return true; } } @@ -355,44 +357,44 @@ static flush_result tcp_flush(grpc_tcp *tcp) { } if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { - return FLUSH_DONE; + *error = GRPC_ERROR_NONE; + return true; } }; } static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - bool success) { + grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; - flush_result status; grpc_closure *cb; - if (!success) { + if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; tcp->write_cb = NULL; - cb->cb(exec_ctx, cb->cb_arg, 0); + cb->cb(exec_ctx, cb->cb_arg, error); TCP_UNREF(exec_ctx, tcp, "write"); return; } - status = tcp_flush(tcp); - if (status == FLUSH_PENDING) { + if (!tcp_flush(tcp, &error)) { grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { cb = tcp->write_cb; tcp->write_cb = NULL; GPR_TIMER_BEGIN("tcp_handle_write.cb", 0); - cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE); + cb->cb(exec_ctx, cb->cb_arg, error); GPR_TIMER_END("tcp_handle_write.cb", 0); TCP_UNREF(exec_ctx, tcp, "write"); + GRPC_ERROR_UNREF(error); } } static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *buf, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; - flush_result status; + grpc_error *error = GRPC_ERROR_NONE; - if (grpc_tcp_trace) { + if (false && grpc_tcp_trace) { size_t i; for (i = 0; i < buf->count; i++) { @@ -408,20 +410,22 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd) + ? GRPC_ERROR_CREATE("EOF") + : GRPC_ERROR_NONE, + NULL); return; } tcp->outgoing_buffer = buf; tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; - status = tcp_flush(tcp); - if (status == FLUSH_PENDING) { + if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); tcp->write_cb = cb; grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { - grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } GPR_TIMER_END("tcp_write", 0); @@ -461,7 +465,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->incoming_buffer = NULL; tcp->slice_size = slice_size; tcp->iov_size = 1; - tcp->finished_edge = 1; + tcp->finished_edge = true; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); tcp->em_fd = em_fd; |