diff options
Diffstat (limited to 'src/core/iomgr/tcp_posix.c')
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 147 |
1 files changed, 80 insertions, 67 deletions
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 68f469c368..54ebad7dbc 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -78,6 +78,9 @@ typedef struct { size_t slice_size; gpr_refcount refcount; + /* garbage after the last read */ + gpr_slice_buffer last_read_buffer; + gpr_slice_buffer *incoming_buffer; gpr_slice_buffer *outgoing_buffer; /** slice within outgoing_buffer to write next */ @@ -85,39 +88,43 @@ typedef struct { /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ size_t outgoing_byte_idx; - grpc_iomgr_closure *read_cb; - grpc_iomgr_closure *write_cb; + grpc_closure *read_cb; + grpc_closure *write_cb; - grpc_iomgr_closure read_closure; - grpc_iomgr_closure write_closure; + grpc_closure read_closure; + grpc_closure write_closure; char *peer_string; } grpc_tcp; -static void tcp_handle_read(void *arg /* grpc_tcp */, int success); -static void tcp_handle_write(void *arg /* grpc_tcp */, int success); +static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + int success); +static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + int success); -static void tcp_shutdown(grpc_endpoint *ep) { +static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_fd_shutdown(tcp->em_fd); + grpc_fd_shutdown(exec_ctx, tcp->em_fd); } -static void tcp_free(grpc_tcp *tcp) { - grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); +static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan"); + gpr_slice_buffer_destroy(&tcp->last_read_buffer); gpr_free(tcp->peer_string); gpr_free(tcp); } /*#define GRPC_TCP_REFCOUNT_DEBUG*/ #ifdef GRPC_TCP_REFCOUNT_DEBUG -#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +#define TCP_UNREF(cl, tcp, reason) \ + tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, - int line) { +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, + const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count - 1); if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } @@ -128,24 +135,24 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, gpr_ref(&tcp->refcount); } #else -#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_tcp *tcp) { +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif -static void tcp_destroy(grpc_endpoint *ep) { +static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - TCP_UNREF(tcp, "destroy"); + TCP_UNREF(exec_ctx, tcp, "destroy"); } -static void call_read_cb(grpc_tcp *tcp, int success) { - grpc_iomgr_closure *cb = tcp->read_cb; +static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) { + grpc_closure *cb = tcp->read_cb; if (grpc_tcp_trace) { size_t i; @@ -160,11 +167,11 @@ static void call_read_cb(grpc_tcp *tcp, int success) { tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - cb->cb(cb->cb_arg, success); + cb->cb(exec_ctx, cb->cb_arg, success); } #define MAX_READ_IOVEC 4 -static void tcp_continue_read(grpc_tcp *tcp) { +static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; @@ -206,69 +213,71 @@ static void tcp_continue_read(grpc_tcp *tcp) { tcp->iov_size /= 2; } /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); + grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { /* TODO(klempner): Log interesting errors */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0); - TCP_UNREF(tcp, "read"); + call_read_cb(exec_ctx, tcp, 0); + TCP_UNREF(exec_ctx, tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0); - TCP_UNREF(tcp, "read"); + call_read_cb(exec_ctx, tcp, 0); + TCP_UNREF(exec_ctx, tcp, "read"); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { gpr_slice_buffer_trim_end( tcp->incoming_buffer, - tcp->incoming_buffer->length - (size_t)read_bytes); + tcp->incoming_buffer->length - (size_t)read_bytes, + &tcp->last_read_buffer); } else if (tcp->iov_size < MAX_READ_IOVEC) { ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); - call_read_cb(tcp, 1); - TCP_UNREF(tcp, "read"); + call_read_cb(exec_ctx, tcp, 1); + TCP_UNREF(exec_ctx, tcp, "read"); } GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); } -static void tcp_handle_read(void *arg /* grpc_tcp */, int success) { +static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + int success) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); if (!success) { gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0); - TCP_UNREF(tcp, "read"); + call_read_cb(exec_ctx, tcp, 0); + TCP_UNREF(exec_ctx, tcp, "read"); } else { - tcp_continue_read(tcp); + tcp_continue_read(exec_ctx, tcp); } } -static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, - gpr_slice_buffer *incoming_buffer, - grpc_iomgr_closure *cb) { +static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + gpr_slice_buffer *incoming_buffer, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->incoming_buffer = incoming_buffer; gpr_slice_buffer_reset_and_unref(incoming_buffer); + gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = 0; - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); + grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1); + grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1); } - /* TODO(ctiller): immediate return */ - return GRPC_ENDPOINT_PENDING; } +typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result; + #define MAX_WRITE_IOVEC 16 -static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { +static flush_result tcp_flush(grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -318,10 +327,10 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { if (errno == EAGAIN) { tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; - return GRPC_ENDPOINT_PENDING; + return FLUSH_PENDING; } else { /* TODO(klempner): Log some of these */ - return GRPC_ENDPOINT_ERROR; + return FLUSH_ERROR; } } @@ -342,42 +351,42 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { } if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { - return GRPC_ENDPOINT_DONE; + return FLUSH_DONE; } }; } -static void tcp_handle_write(void *arg /* grpc_tcp */, int success) { +static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + int success) { grpc_tcp *tcp = (grpc_tcp *)arg; - grpc_endpoint_op_status status; - grpc_iomgr_closure *cb; + flush_result status; + grpc_closure *cb; if (!success) { cb = tcp->write_cb; tcp->write_cb = NULL; - cb->cb(cb->cb_arg, 0); - TCP_UNREF(tcp, "write"); + cb->cb(exec_ctx, cb->cb_arg, 0); + TCP_UNREF(exec_ctx, tcp, "write"); return; } GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); status = tcp_flush(tcp); - if (status == GRPC_ENDPOINT_PENDING) { - grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); + if (status == FLUSH_PENDING) { + grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { cb = tcp->write_cb; tcp->write_cb = NULL; - cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE); - TCP_UNREF(tcp, "write"); + cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE); + TCP_UNREF(exec_ctx, tcp, "write"); } GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); } -static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep, - gpr_slice_buffer *buf, - grpc_iomgr_closure *cb) { +static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + gpr_slice_buffer *buf, grpc_closure *cb) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_endpoint_op_status status; + flush_result status; if (grpc_tcp_trace) { size_t i; @@ -395,32 +404,35 @@ static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep, if (buf->length == 0) { GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); - return GRPC_ENDPOINT_DONE; + grpc_exec_ctx_enqueue(exec_ctx, cb, 1); + return; } tcp->outgoing_buffer = buf; tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; status = tcp_flush(tcp); - if (status == GRPC_ENDPOINT_PENDING) { + if (status == FLUSH_PENDING) { TCP_REF(tcp, "write"); tcp->write_cb = cb; - grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); + grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + } else { + grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE); } GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); - return status; } -static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { +static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset *pollset) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_pollset_add_fd(pollset, tcp->em_fd); + grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd); } -static void tcp_add_to_pollset_set(grpc_endpoint *ep, +static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset_set) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); + grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd); } static char *tcp_get_peer(grpc_endpoint *ep) { @@ -451,6 +463,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->read_closure.cb_arg = tcp; tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; + gpr_slice_buffer_init(&tcp->last_read_buffer); return &tcp->base; } |