diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 156 |
1 files changed, 89 insertions, 67 deletions
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 40897fb8f8..6b21bcf6a9 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -258,6 +258,8 @@ typedef struct { grpc_endpoint base; grpc_fd *em_fd; int fd; + int iov_size; /* Number of slices to allocate per read attempt */ + int finished_edge; size_t slice_size; gpr_refcount refcount; @@ -315,9 +317,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, #define INLINE_SLICE_BUFFER_SIZE 8 #define MAX_READ_IOVEC 4 -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { - grpc_tcp *tcp = (grpc_tcp *)arg; - int iov_size = 1; +static void grpc_tcp_continue_read(grpc_tcp *tcp) { gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; @@ -327,88 +327,103 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { gpr_slice *final_slices; size_t final_nslices; + GPR_ASSERT(!tcp->finished_edge); GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0); slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, 0); - if (!success) { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); - grpc_tcp_unref(tcp); - return; + allocated_bytes = slice_state_append_blocks_into_iovec( + &read_state, iov, tcp->iov_size, tcp->slice_size); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = tcp->iov_size; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + GRPC_TIMER_MARK(RECVMSG_BEGIN, 0); + do { + read_bytes = recvmsg(tcp->fd, &msg, 0); + } while (read_bytes < 0 && errno == EINTR); + GRPC_TIMER_MARK(RECVMSG_END, 0); + + if (read_bytes < allocated_bytes) { + /* TODO(klempner): Consider a second read first, in hopes of getting a + * quick EAGAIN and saving a bunch of allocations. */ + slice_state_remove_last(&read_state, read_bytes < 0 + ? allocated_bytes + : allocated_bytes - read_bytes); } - /* TODO(klempner): Limit the amount we read at once. */ - for (;;) { - allocated_bytes = slice_state_append_blocks_into_iovec( - &read_state, iov, iov_size, tcp->slice_size); - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = iov_size; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - - GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0); - do { - read_bytes = recvmsg(tcp->fd, &msg, 0); - } while (read_bytes < 0 && errno == EINTR); - GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0); - - if (read_bytes < allocated_bytes) { - /* TODO(klempner): Consider a second read first, in hopes of getting a - * quick EAGAIN and saving a bunch of allocations. */ - slice_state_remove_last(&read_state, read_bytes < 0 - ? allocated_bytes - : allocated_bytes - read_bytes); - } - - if (read_bytes < 0) { - /* NB: After calling the user_cb a parallel call of the read handler may - * be running. */ - if (errno == EAGAIN) { - if (slice_state_has_available(&read_state)) { - /* TODO(klempner): We should probably do the call into the application - without all this junk on the stack */ - /* FIXME(klempner): Refcount properly */ - slice_state_transfer_ownership(&read_state, &final_slices, - &final_nslices); - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); - } else { - /* Spurious read event, consume it here */ - slice_state_destroy(&read_state); - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); - } - } else { - /* TODO(klempner): Log interesting errors */ - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR); - slice_state_destroy(&read_state); - grpc_tcp_unref(tcp); + if (read_bytes < 0) { + /* NB: After calling the user_cb a parallel call of the read handler may + * be running. */ + if (errno == EAGAIN) { + if (tcp->iov_size > 1) { + tcp->iov_size /= 2; } - return; - } else if (read_bytes == 0) { - /* 0 read size ==> end of stream */ if (slice_state_has_available(&read_state)) { - /* there were bytes already read: pass them up to the application */ + /* TODO(klempner): We should probably do the call into the application + without all this junk on the stack */ + /* FIXME(klempner): Refcount properly */ slice_state_transfer_ownership(&read_state, &final_slices, &final_nslices); - call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF); + tcp->finished_edge = 1; + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); } else { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF); + /* We've consumed the edge, request a new one */ + slice_state_destroy(&read_state); + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } + } else { + /* TODO(klempner): Log interesting errors */ + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_ERROR); slice_state_destroy(&read_state); grpc_tcp_unref(tcp); - return; - } else if (iov_size < MAX_READ_IOVEC) { - ++iov_size; } + } else if (read_bytes == 0) { + /* 0 read size ==> end of stream */ + if (slice_state_has_available(&read_state)) { + /* there were bytes already read: pass them up to the application */ + slice_state_transfer_ownership(&read_state, &final_slices, + &final_nslices); + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_EOF); + } else { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_EOF); + } + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); + } else { + if (tcp->iov_size < MAX_READ_IOVEC) { + ++tcp->iov_size; + } + GPR_ASSERT(slice_state_has_available(&read_state)); + slice_state_transfer_ownership(&read_state, &final_slices, + &final_nslices); + call_read_cb(tcp, final_slices, final_nslices, GRPC_ENDPOINT_CB_OK); + slice_state_destroy(&read_state); + grpc_tcp_unref(tcp); } + GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); } +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { + grpc_tcp *tcp = (grpc_tcp *)arg; + GPR_ASSERT(!tcp->finished_edge); + + if (!success) { + call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); + grpc_tcp_unref(tcp); + } else { + grpc_tcp_continue_read(tcp); + } +} + static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, void *user_data) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -416,7 +431,12 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, tcp->read_cb = cb; tcp->read_user_data = user_data; gpr_ref(&tcp->refcount); - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); + if (tcp->finished_edge) { + tcp->finished_edge = 0; + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); + } else { + grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp); + } } #define MAX_WRITE_IOVEC 16 @@ -554,6 +574,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { tcp->read_user_data = NULL; tcp->write_user_data = NULL; tcp->slice_size = slice_size; + tcp->iov_size = 1; + tcp->finished_edge = 1; slice_state_init(&tcp->write_state, NULL, 0, 0); /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); |