diff options
author | Yash Tibrewal <yashkt@google.com> | 2018-07-16 16:21:43 -0700 |
---|---|---|
committer | Yash Tibrewal <yashkt@google.com> | 2018-07-16 17:48:33 -0700 |
commit | 0d757a659fd24ecfed801614565d2408e2e302fc (patch) | |
tree | f6a8da5d47a0d8168d3ee2fc2bffd49dcefccfbb /src/core/lib/iomgr/tcp_posix.cc | |
parent | f0397933b007e2614ba38fc98f0ee6391a2eea9d (diff) |
Adding docs and cleaning up
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 79 |
1 files changed, 41 insertions, 38 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 97251e7bdc..4300a9f882 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -106,12 +106,16 @@ struct grpc_tcp { grpc_resource_user* resource_user; grpc_resource_user_slice_allocator slice_allocator; - grpc_core::TracedBuffer* head; - gpr_mu traced_buffer_lock; - void* outgoing_buffer_arg; - int bytes_counter; - bool socket_ts_enabled; - gpr_atm stop_error_notification; + grpc_core::TracedBuffer* head; /* List of traced buffers */ + gpr_mu traced_buffer_lock; /* Lock for access to list of traced buffers */ + void* outgoing_buffer_arg; /* buffer arg provided on grpc_endpoint_write */ + int bytes_counter; /* Current TCP relative sequence number. Used for + timestamping traced buffers. */ + bool socket_ts_enabled; /* True if timestamping options are set on the socket + */ + gpr_atm + stop_error_notification; /* Set to 1 if we do not want to be notified on + errors anymore */ }; struct backup_poller { @@ -360,7 +364,6 @@ static void tcp_destroy(grpc_endpoint* ep) { grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { - // gpr_log(GPR_INFO, "stop errors"); gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_notify_on_error(tcp->em_fd, nullptr); } @@ -539,6 +542,8 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, size_t sending_length, ssize_t* sent_length, grpc_error** error); + +/** The callback function to be invoked when we get an error on the socket. */ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error); #ifdef GRPC_LINUX_ERRQUEUE @@ -547,13 +552,14 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, ssize_t* sent_length, grpc_error** error) { if (!tcp->socket_ts_enabled) { - // gpr_log(GPR_INFO, "setting options yo"); uint32_t opt = grpc_core::kTimestampingSocketOptions; if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, static_cast<void*>(&opt), sizeof(opt)) != 0) { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); - gpr_log(GPR_INFO, "failed to set"); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket."); + } return false; } tcp->socket_ts_enabled = true; @@ -589,18 +595,29 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, return true; } +/** Reads \a cmsg to derive timestamps from the control messages. If a valid + * timestamp is found, the traced buffer list is updated with this timestamp. + * The caller of this function should be looping on the control messages found + * in \a msg. \a cmsg should point to the control message that the caller wants + * processed. + * On return, a pointer to a control message is returned. On the next iteration, + * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, struct cmsghdr* cmsg) { auto next_cmsg = CMSG_NXTHDR(msg, cmsg); if (next_cmsg == nullptr) { - gpr_log(GPR_ERROR, "Received timestamp without extended error"); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Received timestamp without extended error"); + } return cmsg; } if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) || !(next_cmsg->cmsg_type == IP_RECVERR || next_cmsg->cmsg_type == IPV6_RECVERR)) { - gpr_log(GPR_ERROR, "Unexpected cmsg"); + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_ERROR, "Unexpected control message"); + } return cmsg; } @@ -609,14 +626,13 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg)); if (serr->ee_errno != ENOMSG || serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) { - gpr_log(GPR_ERROR, "Unexpected cmsg"); + gpr_log(GPR_ERROR, "Unexpected control message"); return cmsg; } /* The error handling can potentially be done on another thread so we need * to protect the traced buffer list. A lock free list might be better. Using * a simple mutex for now. */ gpr_mu_lock(&tcp->traced_buffer_lock); - // gpr_log(GPR_INFO, "processing timestamp"); grpc_core::TracedBuffer::ProcessTimestamp(&tcp->head, serr, tss); gpr_mu_unlock(&tcp->traced_buffer_lock); return next_cmsg; @@ -624,14 +640,12 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, /** For linux platforms, reads the socket's error queue and processes error * messages from the queue. Returns true if all the errors processed were - * timestamps. Returns false if the any of the errors were not timestamps. For + * timestamps. Returns false if any of the errors were not timestamps. For * non-linux platforms, error processing is not enabled currently, and hence * crashes out. */ static bool process_errors(grpc_tcp* tcp) { - // gpr_log(GPR_INFO, "process errors"); while (true) { - // gpr_log(GPR_INFO, "looping"); struct iovec iov; iov.iov_base = nullptr; iov.iov_len = 0; @@ -654,24 +668,22 @@ static bool process_errors(grpc_tcp* tcp) { int r, saved_errno; do { - // gpr_log(GPR_INFO, "error recvmsg"); r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE); saved_errno = errno; } while (r < 0 && saved_errno == EINTR); if (r == -1 && saved_errno == EAGAIN) { - // gpr_log(GPR_INFO, "here"); return true; /* No more errors to process */ } if (r == -1) { - // gpr_log(GPR_INFO, "%d", saved_errno); return false; } - if ((msg.msg_flags & MSG_CTRUNC) == 1) { - gpr_log(GPR_INFO, "Error message was truncated."); + if (grpc_tcp_trace.enabled()) { + if ((msg.msg_flags & MSG_CTRUNC) == 1) { + gpr_log(GPR_INFO, "Error message was truncated."); + } } - // gpr_log(GPR_INFO, "%d %lu", r, msg.msg_controllen); if (msg.msg_controllen == 0) { /* There was no control message read. Return now */ return true; @@ -680,10 +692,12 @@ static bool process_errors(grpc_tcp* tcp) { cmsg = CMSG_NXTHDR(&msg, cmsg)) { if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_TIMESTAMPING) { - /* Got a weird one, not a timestamp */ - gpr_log(GPR_INFO, "weird %d %d %d", r, cmsg->cmsg_level, - cmsg->cmsg_type); - continue; + /* Got a weird control message, not a timestamp */ + if (grpc_tcp_trace.enabled()) { + gpr_log(GPR_INFO, "weird control message cmsg_level:%d cmsg_type:%d", + cmsg->cmsg_level, cmsg->cmsg_type); + } + return false; } process_timestamp(tcp, &msg, cmsg); } @@ -691,7 +705,6 @@ static bool process_errors(grpc_tcp* tcp) { } static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { - // gpr_log(GPR_INFO, "grpc_tcp_handle_error"); grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error)); @@ -701,15 +714,10 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) { /* We aren't going to register to hear on error anymore, so it is safe to * unref. */ - // gpr_log(GPR_INFO, "%p %d", error, - // static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))); - // gpr_log(GPR_INFO, "unref"); grpc_core::TracedBuffer::Shutdown(&tcp->head, GRPC_ERROR_REF(error)); TCP_UNREF(tcp, "error"); - // gpr_log(GPR_INFO, "here"); } else { if (!process_errors(tcp)) { - // gpr_log(GPR_INFO, "no timestamps"); /* This was not a timestamps error. This was an actual error. Set the * read and write closures to be ready. */ @@ -719,7 +727,6 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure); - // gpr_log(GPR_INFO, "udhar se"); } } @@ -798,7 +805,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); } while (sent_length < 0 && errno == EINTR); } - // gpr_log(GPR_INFO, "sent length %ld", sent_length); if (sent_length < 0) { if (errno == EAGAIN) { @@ -869,7 +875,6 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); } - // gpr_log(GPR_INFO, "scheduling callback"); GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } @@ -913,14 +918,12 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "write: delayed"); } - // gpr_log(GPR_INFO, "notify"); notify_on_write(tcp); } else { if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); } - // gpr_log(GPR_INFO, "sched"); GRPC_CLOSURE_SCHED(cb, error); } } @@ -1069,7 +1072,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, tcp->release_fd_cb = done; grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { - // gpr_log(GPR_INFO, "stop errors"); + /* Stop errors notification. */ gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_notify_on_error(tcp->em_fd, nullptr); } |