diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 312 |
1 files changed, 299 insertions, 13 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index b53ffbf01c..ac1e919acb 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -27,7 +27,9 @@ #include <errno.h> #include <limits.h> +#include <netinet/in.h> #include <stdbool.h> +#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> @@ -46,6 +48,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gpr/useful.h" +#include "src/core/lib/iomgr/buffer_list.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" @@ -97,17 +100,42 @@ struct grpc_tcp { grpc_closure read_done_closure; grpc_closure write_done_closure; + grpc_closure error_closure; char* peer_string; grpc_resource_user* resource_user; grpc_resource_user_slice_allocator slice_allocator; + + grpc_core::TracedBuffer* tb_head; /* List of traced buffers */ + gpr_mu tb_mu; /* Lock for access to list of traced buffers */ + + /* grpc_endpoint_write takes an argument which if non-null means that the + * transport layer wants the TCP layer to collect timestamps for this write. + * This arg is forwarded to the timestamps callback function when the ACK + * timestamp is received from the kernel. This arg is a (void *) which allows + * users of this API to pass in a pointer to any kind of structure. This + * structure could actually be a tag or any book-keeping object that the user + * can use to distinguish between different traced writes. The only + * requirement from the TCP endpoint layer is that this arg should be non-null + * if the user wants timestamps for the write. */ + void* outgoing_buffer_arg; + /* A counter which starts at 0. It is initialized the first time the socket + * options for collecting timestamps are set, and is incremented with each + * byte sent. */ + int bytes_counter; + bool socket_ts_enabled; /* True if timestamping options are set on the socket + */ + gpr_atm + stop_error_notification; /* Set to 1 if we do not want to be notified on + errors anymore */ }; struct backup_poller { gpr_mu* pollset_mu; grpc_closure run_poller; }; + } // namespace #define BACKUP_POLLER_POLLSET(b) ((grpc_pollset*)((b) + 1)) @@ -302,6 +330,7 @@ static void tcp_free(grpc_tcp* tcp) { grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_resource_user_unref(tcp->resource_user); gpr_free(tcp->peer_string); + gpr_mu_destroy(&tcp->tb_mu); gpr_free(tcp); } @@ -347,6 +376,10 @@ static void tcp_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + if (grpc_event_engine_can_track_errors()) { + gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); + grpc_fd_set_error(tcp->em_fd); + } TCP_UNREF(tcp, "destroy"); } @@ -513,6 +546,235 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, } } +/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number + * of bytes sent. */ +ssize_t tcp_send(int fd, const struct msghdr* msg) { + GPR_TIMER_SCOPE("sendmsg", 1); + ssize_t sent_length; + do { + /* TODO(klempner): Cork if this is a partial write */ + GRPC_STATS_INC_SYSCALL_WRITE(); + sent_length = sendmsg(fd, msg, SENDMSG_FLAGS); + } while (sent_length < 0 && errno == EINTR); + return sent_length; +} + +/** This is to be called if outgoing_buffer_arg is not null. On linux platforms, + * this will call sendmsg with socket options set to collect timestamps inside + * the kernel. On return, sent_length is set to the return value of the sendmsg + * call. Returns false if setting the socket options failed. This is not + * implemented for non-linux platforms currently, and crashes out. + */ +static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, + size_t sending_length, + ssize_t* sent_length, grpc_error** error); + +/** The callback function to be invoked when we get an error on the socket. */ +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); + +#ifdef GRPC_LINUX_ERRQUEUE +static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, + size_t sending_length, + ssize_t* sent_length, + grpc_error** error) { + if (!tcp->socket_ts_enabled) { + uint32_t opt = grpc_core::kTimestampingSocketOptions; + if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, + static_cast<void*>(&opt), sizeof(opt)) != 0) { + *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp); + grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket."); + } + return false; + } + tcp->bytes_counter = -1; + tcp->socket_ts_enabled = true; + } + /* Set control message to indicate that you want timestamps. */ + union { + char cmsg_buf[CMSG_SPACE(sizeof(uint32_t))]; + struct cmsghdr align; + } u; + cmsghdr* cmsg = reinterpret_cast<cmsghdr*>(u.cmsg_buf); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SO_TIMESTAMPING; + cmsg->cmsg_len = CMSG_LEN(sizeof(uint32_t)); + *reinterpret_cast<int*>(CMSG_DATA(cmsg)) = + grpc_core::kTimestampingRecordingOptions; + msg->msg_control = u.cmsg_buf; + msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t)); + + /* If there was an error on sendmsg the logic in tcp_flush will handle it. */ + ssize_t length = tcp_send(tcp->fd, msg); + *sent_length = length; + /* Only save timestamps if all the bytes were taken by sendmsg. */ + if (sending_length == static_cast<size_t>(length)) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::AddNewEntry( + &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length), + tcp->outgoing_buffer_arg); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; + } + return true; +} + +/** Reads \a cmsg to derive timestamps from the control messages. If a valid + * timestamp is found, the traced buffer list is updated with this timestamp. + * The caller of this function should be looping on the control messages found + * in \a msg. \a cmsg should point to the control message that the caller wants + * processed. + * On return, a pointer to a control message is returned. On the next iteration, + * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */ +struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, + struct cmsghdr* cmsg) { + auto next_cmsg = CMSG_NXTHDR(msg, cmsg); + if (next_cmsg == nullptr) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Received timestamp without extended error"); + } + return cmsg; + } + + if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) || + !(next_cmsg->cmsg_type == IP_RECVERR || + next_cmsg->cmsg_type == IPV6_RECVERR)) { + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Unexpected control message"); + } + return cmsg; + } + + auto tss = + reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg)); + auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg)); + if (serr->ee_errno != ENOMSG || + serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) { + gpr_log(GPR_ERROR, "Unexpected control message"); + return cmsg; + } + /* The error handling can potentially be done on another thread so we need + * to protect the traced buffer list. A lock free list might be better. Using + * a simple mutex for now. */ + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::ProcessTimestamp(&tcp->tb_head, serr, tss); + gpr_mu_unlock(&tcp->tb_mu); + return next_cmsg; +} + +/** For linux platforms, reads the socket's error queue and processes error + * messages from the queue. Returns true if all the errors processed were + * timestamps. Returns false if any of the errors were not timestamps. For + * non-linux platforms, error processing is not used/enabled currently. + */ +static bool process_errors(grpc_tcp* tcp) { + while (true) { + struct iovec iov; + iov.iov_base = nullptr; + iov.iov_len = 0; + struct msghdr msg; + msg.msg_name = nullptr; + msg.msg_namelen = 0; + msg.msg_iov = &iov; + msg.msg_iovlen = 0; + msg.msg_flags = 0; + + union { + char rbuf[1024 /*CMSG_SPACE(sizeof(scm_timestamping)) + + CMSG_SPACE(sizeof(sock_extended_err) + sizeof(sockaddr_in))*/]; + struct cmsghdr align; + } aligned_buf; + memset(&aligned_buf, 0, sizeof(aligned_buf)); + + msg.msg_control = aligned_buf.rbuf; + msg.msg_controllen = sizeof(aligned_buf.rbuf); + + int r, saved_errno; + do { + r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE); + saved_errno = errno; + } while (r < 0 && saved_errno == EINTR); + + if (r == -1 && saved_errno == EAGAIN) { + return true; /* No more errors to process */ + } + if (r == -1) { + return false; + } + if (grpc_tcp_trace.enabled()) { + if ((msg.msg_flags & MSG_CTRUNC) == 1) { + gpr_log(GPR_INFO, "Error message was truncated."); + } + } + + if (msg.msg_controllen == 0) { + /* There was no control message found. It was probably spurious. */ + return true; + } + for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; + cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level != SOL_SOCKET || + cmsg->cmsg_type != SCM_TIMESTAMPING) { + /* Got a control message that is not a timestamp. Don't know how to + * handle this. */ + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, + "unknown control message cmsg_level:%d cmsg_type:%d", + cmsg->cmsg_level, cmsg->cmsg_type); + } + return false; + } + process_timestamp(tcp, &msg, cmsg); + } + } +} + +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { + grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error)); + } + + if (error != GRPC_ERROR_NONE || + static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) { + /* We aren't going to register to hear on error anymore, so it is safe to + * unref. */ + grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error)); + TCP_UNREF(tcp, "error-tracking"); + return; + } + + /* We are still interested in collecting timestamps, so let's try reading + * them. */ + if (!process_errors(tcp)) { + /* This was not a timestamps error. This was an actual error. Set the + * read and write closures to be ready. + */ + grpc_fd_set_readable(tcp->em_fd); + grpc_fd_set_writable(tcp->em_fd); + } + GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); +} + +#else /* GRPC_LINUX_ERRQUEUE */ +static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, + size_t sending_length, + ssize_t* sent_length, + grpc_error** error) { + gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); + GPR_ASSERT(0); + return false; +} + +static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { + gpr_log(GPR_ERROR, "Error handling is not supported for this platform"); + GPR_ASSERT(0); +} +#endif /* GRPC_LINUX_ERRQUEUE */ + /* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX @@ -557,19 +819,20 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = iov_size; - msg.msg_control = nullptr; - msg.msg_controllen = 0; msg.msg_flags = 0; + if (tcp->outgoing_buffer_arg != nullptr) { + if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, + error)) + return true; /* something went wrong with timestamps */ + } else { + msg.msg_control = nullptr; + msg.msg_controllen = 0; - GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); - GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); + GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); + GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); - GPR_TIMER_SCOPE("sendmsg", 1); - do { - /* TODO(klempner): Cork if this is a partial write */ - GRPC_STATS_INC_SYSCALL_WRITE(); - sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); - } while (sent_length < 0 && errno == EINTR); + sent_length = tcp_send(tcp->fd, &msg); + } if (sent_length < 0) { if (errno == EAGAIN) { @@ -593,6 +856,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { } GPR_ASSERT(tcp->outgoing_byte_idx == 0); + tcp->bytes_counter += sent_length; trailing = sending_length - static_cast<size_t>(sent_length); while (trailing > 0) { size_t slice_length; @@ -607,7 +871,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { trailing -= slice_length; } } - if (outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); @@ -640,14 +903,13 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } } static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, - grpc_closure* cb) { + grpc_closure* cb, void* arg) { GPR_TIMER_SCOPE("tcp_write", 0); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_error* error = GRPC_ERROR_NONE; @@ -675,6 +937,10 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, } tcp->outgoing_buffer = buf; tcp->outgoing_byte_idx = 0; + tcp->outgoing_buffer_arg = arg; + if (arg) { + GPR_ASSERT(grpc_event_engine_can_track_errors()); + } if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); @@ -792,6 +1058,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->bytes_read_this_round = 0; /* Will be set to false by the very first endpoint read function */ tcp->is_first_read = true; + tcp->bytes_counter = -1; + tcp->socket_ts_enabled = false; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -803,6 +1071,19 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); grpc_resource_quota_unref_internal(resource_quota); + gpr_mu_init(&tcp->tb_mu); + tcp->tb_head = nullptr; + /* Start being notified on errors if event engine can track errors. */ + if (grpc_event_engine_can_track_errors()) { + /* Grab a ref to tcp so that we can safely access the tcp struct when + * processing errors. We unref when we no longer want to track errors + * separately. */ + TCP_REF(tcp, "error-tracking"); + gpr_atm_rel_store(&tcp->stop_error_notification, 0); + GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); + } return &tcp->base; } @@ -821,6 +1102,11 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, tcp->release_fd = fd; tcp->release_fd_cb = done; grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + if (grpc_event_engine_can_track_errors()) { + /* Stop errors notification. */ + gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); + grpc_fd_set_error(tcp->em_fd); + } TCP_UNREF(tcp, "destroy"); } |