diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 130 |
1 files changed, 95 insertions, 35 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index aa2704ce26..c268c18664 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -126,6 +126,7 @@ struct grpc_tcp { int bytes_counter; bool socket_ts_enabled; /* True if timestamping options are set on the socket */ + bool ts_capable; /* Cache whether we can set timestamping options */ gpr_atm stop_error_notification; /* Set to 1 if we do not want to be notified on errors anymore */ @@ -260,10 +261,17 @@ static void notify_on_write(grpc_tcp* tcp) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p notify_on_write", tcp); } - cover_self(tcp); - GRPC_CLOSURE_INIT(&tcp->write_done_closure, - tcp_drop_uncovered_then_handle_write, tcp, - grpc_schedule_on_exec_ctx); + if (grpc_event_engine_run_in_background()) { + // If there is a polling engine always running in the background, there is + // no need to run the backup poller. + GRPC_CLOSURE_INIT(&tcp->write_done_closure, tcp_handle_write, tcp, + grpc_schedule_on_exec_ctx); + } else { + cover_self(tcp); + GRPC_CLOSURE_INIT(&tcp->write_done_closure, + tcp_drop_uncovered_then_handle_write, tcp, + grpc_schedule_on_exec_ctx); + } grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure); } @@ -384,6 +392,12 @@ static void tcp_destroy(grpc_endpoint* ep) { grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } @@ -576,7 +590,7 @@ ssize_t tcp_send(int fd, const struct msghdr* msg) { */ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length, grpc_error** error); + ssize_t* sent_length); /** The callback function to be invoked when we get an error on the socket. */ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); @@ -584,13 +598,11 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); #ifdef GRPC_LINUX_ERRQUEUE static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length, - grpc_error** error) { + ssize_t* sent_length) { if (!tcp->socket_ts_enabled) { uint32_t opt = grpc_core::kTimestampingSocketOptions; if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, static_cast<void*>(&opt), sizeof(opt)) != 0) { - *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket."); @@ -621,7 +633,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, if (sending_length == static_cast<size_t>(length)) { gpr_mu_lock(&tcp->tb_mu); grpc_core::TracedBuffer::AddNewEntry( - &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length), + &tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length), tcp->outgoing_buffer_arg); gpr_mu_unlock(&tcp->tb_mu); tcp->outgoing_buffer_arg = nullptr; @@ -673,11 +685,9 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, } /** For linux platforms, reads the socket's error queue and processes error - * messages from the queue. Returns true if all the errors processed were - * timestamps. Returns false if any of the errors were not timestamps. For - * non-linux platforms, error processing is not used/enabled currently. + * messages from the queue. */ -static bool process_errors(grpc_tcp* tcp) { +static void process_errors(grpc_tcp* tcp) { while (true) { struct iovec iov; iov.iov_base = nullptr; @@ -706,10 +716,10 @@ static bool process_errors(grpc_tcp* tcp) { } while (r < 0 && saved_errno == EINTR); if (r == -1 && saved_errno == EAGAIN) { - return true; /* No more errors to process */ + return; /* No more errors to process */ } if (r == -1) { - return false; + return; } if (grpc_tcp_trace.enabled()) { if ((msg.msg_flags & MSG_CTRUNC) == 1) { @@ -719,8 +729,9 @@ static bool process_errors(grpc_tcp* tcp) { if (msg.msg_controllen == 0) { /* There was no control message found. It was probably spurious. */ - return true; + return; } + bool seen = false; for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len; cmsg = CMSG_NXTHDR(&msg, cmsg)) { if (cmsg->cmsg_level != SOL_SOCKET || @@ -732,9 +743,13 @@ static bool process_errors(grpc_tcp* tcp) { "unknown control message cmsg_level:%d cmsg_type:%d", cmsg->cmsg_level, cmsg->cmsg_type); } - return false; + return; } - process_timestamp(tcp, &msg, cmsg); + cmsg = process_timestamp(tcp, &msg, cmsg); + seen = true; + } + if (!seen) { + return; } } } @@ -749,20 +764,17 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) { /* We aren't going to register to hear on error anymore, so it is safe to * unref. */ - grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error)); TCP_UNREF(tcp, "error-tracking"); return; } /* We are still interested in collecting timestamps, so let's try reading * them. */ - if (!process_errors(tcp)) { - /* This was not a timestamps error. This was an actual error. Set the - * read and write closures to be ready. - */ - grpc_fd_set_readable(tcp->em_fd); - grpc_fd_set_writable(tcp->em_fd); - } + process_errors(tcp); + /* This might not a timestamps error. Set the read and write closures to be + * ready. */ + grpc_fd_set_readable(tcp->em_fd); + grpc_fd_set_writable(tcp->em_fd); GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); @@ -771,8 +783,7 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { #else /* GRPC_LINUX_ERRQUEUE */ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, - ssize_t* sent_length, - grpc_error** error) { + ssize_t* sent_length) { gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform"); GPR_ASSERT(0); return false; @@ -784,6 +795,19 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { } #endif /* GRPC_LINUX_ERRQUEUE */ +/* If outgoing_buffer_arg is filled, shuts down the list early, so that any + * release operations needed can be performed on the arg */ +void tcp_shutdown_buffer_list(grpc_tcp* tcp) { + if (tcp->outgoing_buffer_arg) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("TracedBuffer list shutdown")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; + } +} + /* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX @@ -794,7 +818,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; - ssize_t sent_length; + ssize_t sent_length = 0; size_t sending_length; size_t trailing; size_t unwind_slice_idx; @@ -829,11 +853,19 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_iov = iov; msg.msg_iovlen = iov_size; msg.msg_flags = 0; + bool tried_sending_message = false; if (tcp->outgoing_buffer_arg != nullptr) { - if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, - error)) - return true; /* something went wrong with timestamps */ - } else { + if (!tcp->ts_capable || + !tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length)) { + /* We could not set socket options to collect Fathom timestamps. + * Fallback on writing without timestamps. */ + tcp->ts_capable = false; + tcp_shutdown_buffer_list(tcp); + } else { + tried_sending_message = true; + } + } + if (!tried_sending_message) { msg.msg_control = nullptr; msg.msg_controllen = 0; @@ -856,10 +888,12 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { } else if (errno == EPIPE) { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + tcp_shutdown_buffer_list(tcp); return true; } else { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + tcp_shutdown_buffer_list(tcp); return true; } } @@ -936,17 +970,18 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, GPR_ASSERT(tcp->write_cb == nullptr); + tcp->outgoing_buffer_arg = arg; if (buf->length == 0) { GRPC_CLOSURE_SCHED( cb, grpc_fd_is_shutdown(tcp->em_fd) ? tcp_annotate_error( GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp) : GRPC_ERROR_NONE); + tcp_shutdown_buffer_list(tcp); return; } tcp->outgoing_buffer = buf; tcp->outgoing_byte_idx = 0; - tcp->outgoing_buffer_arg = arg; if (arg) { GPR_ASSERT(grpc_event_engine_can_track_errors()); } @@ -999,6 +1034,22 @@ static grpc_resource_user* tcp_get_resource_user(grpc_endpoint* ep) { return tcp->resource_user; } +static bool tcp_can_track_err(grpc_endpoint* ep) { + grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); + if (!grpc_event_engine_can_track_errors()) { + return false; + } + struct sockaddr addr; + socklen_t len = sizeof(addr); + if (getsockname(tcp->fd, &addr, &len) < 0) { + return false; + } + if (addr.sa_family == AF_INET || addr.sa_family == AF_INET6) { + return true; + } + return false; +} + static const grpc_endpoint_vtable vtable = {tcp_read, tcp_write, tcp_add_to_pollset, @@ -1008,7 +1059,8 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_destroy, tcp_get_resource_user, tcp_get_peer, - tcp_get_fd}; + tcp_get_fd, + tcp_can_track_err}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 @@ -1069,6 +1121,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->is_first_read = true; tcp->bytes_counter = -1; tcp->socket_ts_enabled = false; + tcp->ts_capable = true; + tcp->outgoing_buffer_arg = nullptr; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -1113,6 +1167,12 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { /* Stop errors notification. */ + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } |