aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_posix.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2018-07-16 16:21:43 -0700
committerGravatar Yash Tibrewal <yashkt@google.com>2018-07-16 17:48:33 -0700
commit0d757a659fd24ecfed801614565d2408e2e302fc (patch)
treef6a8da5d47a0d8168d3ee2fc2bffd49dcefccfbb /src/core/lib/iomgr/tcp_posix.cc
parentf0397933b007e2614ba38fc98f0ee6391a2eea9d (diff)
Adding docs and cleaning up
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc79
1 files changed, 41 insertions, 38 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 97251e7bdc..4300a9f882 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -106,12 +106,16 @@ struct grpc_tcp {
grpc_resource_user* resource_user;
grpc_resource_user_slice_allocator slice_allocator;
- grpc_core::TracedBuffer* head;
- gpr_mu traced_buffer_lock;
- void* outgoing_buffer_arg;
- int bytes_counter;
- bool socket_ts_enabled;
- gpr_atm stop_error_notification;
+ grpc_core::TracedBuffer* head; /* List of traced buffers */
+ gpr_mu traced_buffer_lock; /* Lock for access to list of traced buffers */
+ void* outgoing_buffer_arg; /* buffer arg provided on grpc_endpoint_write */
+ int bytes_counter; /* Current TCP relative sequence number. Used for
+ timestamping traced buffers. */
+ bool socket_ts_enabled; /* True if timestamping options are set on the socket
+ */
+ gpr_atm
+ stop_error_notification; /* Set to 1 if we do not want to be notified on
+ errors anymore */
};
struct backup_poller {
@@ -360,7 +364,6 @@ static void tcp_destroy(grpc_endpoint* ep) {
grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep);
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
if (grpc_event_engine_can_track_errors()) {
- // gpr_log(GPR_INFO, "stop errors");
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_notify_on_error(tcp->em_fd, nullptr);
}
@@ -539,6 +542,8 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length, grpc_error** error);
+
+/** The callback function to be invoked when we get an error on the socket. */
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error);
#ifdef GRPC_LINUX_ERRQUEUE
@@ -547,13 +552,14 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
ssize_t* sent_length,
grpc_error** error) {
if (!tcp->socket_ts_enabled) {
- // gpr_log(GPR_INFO, "setting options yo");
uint32_t opt = grpc_core::kTimestampingSocketOptions;
if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING,
static_cast<void*>(&opt), sizeof(opt)) != 0) {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "setsockopt"), tcp);
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
- gpr_log(GPR_INFO, "failed to set");
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Failed to set timestamping options on the socket.");
+ }
return false;
}
tcp->socket_ts_enabled = true;
@@ -589,18 +595,29 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
return true;
}
+/** Reads \a cmsg to derive timestamps from the control messages. If a valid
+ * timestamp is found, the traced buffer list is updated with this timestamp.
+ * The caller of this function should be looping on the control messages found
+ * in \a msg. \a cmsg should point to the control message that the caller wants
+ * processed.
+ * On return, a pointer to a control message is returned. On the next iteration,
+ * CMSG_NXTHDR(msg, ret_val) should be passed as \a cmsg. */
struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
struct cmsghdr* cmsg) {
auto next_cmsg = CMSG_NXTHDR(msg, cmsg);
if (next_cmsg == nullptr) {
- gpr_log(GPR_ERROR, "Received timestamp without extended error");
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Received timestamp without extended error");
+ }
return cmsg;
}
if (!(next_cmsg->cmsg_level == SOL_IP || next_cmsg->cmsg_level == SOL_IPV6) ||
!(next_cmsg->cmsg_type == IP_RECVERR ||
next_cmsg->cmsg_type == IPV6_RECVERR)) {
- gpr_log(GPR_ERROR, "Unexpected cmsg");
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_ERROR, "Unexpected control message");
+ }
return cmsg;
}
@@ -609,14 +626,13 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
if (serr->ee_errno != ENOMSG ||
serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {
- gpr_log(GPR_ERROR, "Unexpected cmsg");
+ gpr_log(GPR_ERROR, "Unexpected control message");
return cmsg;
}
/* The error handling can potentially be done on another thread so we need
* to protect the traced buffer list. A lock free list might be better. Using
* a simple mutex for now. */
gpr_mu_lock(&tcp->traced_buffer_lock);
- // gpr_log(GPR_INFO, "processing timestamp");
grpc_core::TracedBuffer::ProcessTimestamp(&tcp->head, serr, tss);
gpr_mu_unlock(&tcp->traced_buffer_lock);
return next_cmsg;
@@ -624,14 +640,12 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
/** For linux platforms, reads the socket's error queue and processes error
* messages from the queue. Returns true if all the errors processed were
- * timestamps. Returns false if the any of the errors were not timestamps. For
+ * timestamps. Returns false if any of the errors were not timestamps. For
* non-linux platforms, error processing is not enabled currently, and hence
* crashes out.
*/
static bool process_errors(grpc_tcp* tcp) {
- // gpr_log(GPR_INFO, "process errors");
while (true) {
- // gpr_log(GPR_INFO, "looping");
struct iovec iov;
iov.iov_base = nullptr;
iov.iov_len = 0;
@@ -654,24 +668,22 @@ static bool process_errors(grpc_tcp* tcp) {
int r, saved_errno;
do {
- // gpr_log(GPR_INFO, "error recvmsg");
r = recvmsg(tcp->fd, &msg, MSG_ERRQUEUE);
saved_errno = errno;
} while (r < 0 && saved_errno == EINTR);
if (r == -1 && saved_errno == EAGAIN) {
- // gpr_log(GPR_INFO, "here");
return true; /* No more errors to process */
}
if (r == -1) {
- // gpr_log(GPR_INFO, "%d", saved_errno);
return false;
}
- if ((msg.msg_flags & MSG_CTRUNC) == 1) {
- gpr_log(GPR_INFO, "Error message was truncated.");
+ if (grpc_tcp_trace.enabled()) {
+ if ((msg.msg_flags & MSG_CTRUNC) == 1) {
+ gpr_log(GPR_INFO, "Error message was truncated.");
+ }
}
- // gpr_log(GPR_INFO, "%d %lu", r, msg.msg_controllen);
if (msg.msg_controllen == 0) {
/* There was no control message read. Return now */
return true;
@@ -680,10 +692,12 @@ static bool process_errors(grpc_tcp* tcp) {
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET ||
cmsg->cmsg_type != SCM_TIMESTAMPING) {
- /* Got a weird one, not a timestamp */
- gpr_log(GPR_INFO, "weird %d %d %d", r, cmsg->cmsg_level,
- cmsg->cmsg_type);
- continue;
+ /* Got a weird control message, not a timestamp */
+ if (grpc_tcp_trace.enabled()) {
+ gpr_log(GPR_INFO, "weird control message cmsg_level:%d cmsg_type:%d",
+ cmsg->cmsg_level, cmsg->cmsg_type);
+ }
+ return false;
}
process_timestamp(tcp, &msg, cmsg);
}
@@ -691,7 +705,6 @@ static bool process_errors(grpc_tcp* tcp) {
}
static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
- // gpr_log(GPR_INFO, "grpc_tcp_handle_error");
grpc_tcp* tcp = static_cast<grpc_tcp*>(arg);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error));
@@ -701,15 +714,10 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) {
/* We aren't going to register to hear on error anymore, so it is safe to
* unref. */
- // gpr_log(GPR_INFO, "%p %d", error,
- // static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification)));
- // gpr_log(GPR_INFO, "unref");
grpc_core::TracedBuffer::Shutdown(&tcp->head, GRPC_ERROR_REF(error));
TCP_UNREF(tcp, "error");
- // gpr_log(GPR_INFO, "here");
} else {
if (!process_errors(tcp)) {
- // gpr_log(GPR_INFO, "no timestamps");
/* This was not a timestamps error. This was an actual error. Set the
* read and write closures to be ready.
*/
@@ -719,7 +727,6 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) {
GRPC_CLOSURE_INIT(&tcp->error_closure, tcp_handle_error, tcp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_error(tcp->em_fd, &tcp->error_closure);
- // gpr_log(GPR_INFO, "udhar se");
}
}
@@ -798,7 +805,6 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
} while (sent_length < 0 && errno == EINTR);
}
- // gpr_log(GPR_INFO, "sent length %ld", sent_length);
if (sent_length < 0) {
if (errno == EAGAIN) {
@@ -869,7 +875,6 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
- // gpr_log(GPR_INFO, "scheduling callback");
GRPC_CLOSURE_SCHED(cb, error);
TCP_UNREF(tcp, "write");
}
@@ -913,14 +918,12 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "write: delayed");
}
- // gpr_log(GPR_INFO, "notify");
notify_on_write(tcp);
} else {
if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_INFO, "write: %s", str);
}
- // gpr_log(GPR_INFO, "sched");
GRPC_CLOSURE_SCHED(cb, error);
}
}
@@ -1069,7 +1072,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
tcp->release_fd_cb = done;
grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
if (grpc_event_engine_can_track_errors()) {
- // gpr_log(GPR_INFO, "stop errors");
+ /* Stop errors notification. */
gpr_atm_no_barrier_store(&tcp->stop_error_notification, true);
grpc_fd_notify_on_error(tcp->em_fd, nullptr);
}