diff options
author | 2018-08-26 03:50:39 -0700 | |
---|---|---|
committer | 2018-08-26 03:50:39 -0700 | |
commit | 9c5bde5e4e2cc0c81c3c6411b3aa922d3e995a54 (patch) | |
tree | 270d52c92efbf923855cd24efab620029af0aa7d /src/core/lib/iomgr/tcp_posix.cc | |
parent | f71fd84e42cd8053de35df7d6137f7c2e2407ce3 (diff) |
More commits
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 68 |
1 files changed, 64 insertions, 4 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 1db2790265..23cbc20910 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -375,8 +375,15 @@ static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); } static void tcp_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); + gpr_log(GPR_INFO, "tcp destroy %p %p", ep, tcp->outgoing_buffer_arg); grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } @@ -578,6 +585,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, ssize_t* sent_length, grpc_error** error) { if (!tcp->socket_ts_enabled) { + gpr_log(GPR_INFO, "set timestamps"); uint32_t opt = grpc_core::kTimestampingSocketOptions; if (setsockopt(tcp->fd, SOL_SOCKET, SO_TIMESTAMPING, static_cast<void*>(&opt), sizeof(opt)) != 0) { @@ -610,6 +618,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg, *sent_length = length; /* Only save timestamps if all the bytes were taken by sendmsg. */ if (sending_length == static_cast<size_t>(length)) { + gpr_log(GPR_INFO, "tcp new entry %p %p", tcp, tcp->outgoing_buffer_arg); gpr_mu_lock(&tcp->tb_mu); grpc_core::TracedBuffer::AddNewEntry( &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length), @@ -668,6 +677,7 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, * non-linux platforms, error processing is not used/enabled currently. */ static bool process_errors(grpc_tcp* tcp) { + gpr_log(GPR_INFO, "process errors"); while (true) { struct iovec iov; iov.iov_base = nullptr; @@ -730,6 +740,8 @@ static bool process_errors(grpc_tcp* tcp) { } static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { + gpr_log(GPR_INFO, "handle error %p", arg); + GRPC_LOG_IF_ERROR("handle error", GRPC_ERROR_REF(error)); grpc_tcp* tcp = static_cast<grpc_tcp*>(arg); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p got_error: %s", tcp, grpc_error_string(error)); @@ -739,7 +751,8 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))) { /* We aren't going to register to hear on error anymore, so it is safe to * unref. */ - grpc_core::TracedBuffer::Shutdown(&tcp->tb_head, GRPC_ERROR_REF(error)); + gpr_log(GPR_INFO, "%p %d early return", error, + static_cast<bool>(gpr_atm_acq_load(&tcp->stop_error_notification))); TCP_UNREF(tcp, "error-tracking"); return; } @@ -774,6 +787,17 @@ static void tcp_handle_error(void* arg /* grpc_tcp */, grpc_error* error) { } #endif /* GRPC_LINUX_ERRQUEUE */ +void tcp_shutdown_buffer_list(grpc_tcp* tcp) { + if (tcp->outgoing_buffer_arg) { + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; + } +} + /* returns true if done, false if pending; if returning true, *error is set */ #if defined(IOV_MAX) && IOV_MAX < 1000 #define MAX_WRITE_IOVEC IOV_MAX @@ -821,8 +845,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { msg.msg_flags = 0; if (tcp->outgoing_buffer_arg != nullptr) { if (!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length, - error)) + error)) { + gpr_log(GPR_INFO, "something went wrong"); + tcp_shutdown_buffer_list(tcp); return true; /* something went wrong with timestamps */ + } } else { msg.msg_control = nullptr; msg.msg_controllen = 0; @@ -844,12 +871,16 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { } return false; } else if (errno == EPIPE) { + gpr_log(GPR_INFO, "something went wrong"); *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + tcp_shutdown_buffer_list(tcp); return true; } else { + gpr_log(GPR_INFO, "something went wrong"); *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); + tcp_shutdown_buffer_list(tcp); return true; } } @@ -910,6 +941,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, grpc_closure* cb, void* arg) { GPR_TIMER_SCOPE("tcp_write", 0); + gpr_log(GPR_INFO, "tcp_write %p %p", ep, arg); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); grpc_error* error = GRPC_ERROR_NONE; @@ -926,17 +958,18 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, GPR_ASSERT(tcp->write_cb == nullptr); + tcp->outgoing_buffer_arg = arg; if (buf->length == 0) { GRPC_CLOSURE_SCHED( cb, grpc_fd_is_shutdown(tcp->em_fd) ? tcp_annotate_error( GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp) : GRPC_ERROR_NONE); + tcp_shutdown_buffer_list(tcp); return; } tcp->outgoing_buffer = buf; tcp->outgoing_byte_idx = 0; - tcp->outgoing_buffer_arg = arg; if (arg) { GPR_ASSERT(grpc_event_engine_can_track_errors()); } @@ -949,6 +982,7 @@ static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, } notify_on_write(tcp); } else { + gpr_log(GPR_INFO, "imm sched"); if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_INFO, "write: %s", str); @@ -989,6 +1023,23 @@ static grpc_resource_user* tcp_get_resource_user(grpc_endpoint* ep) { return tcp->resource_user; } +static bool tcp_can_track_err(grpc_endpoint* ep) { + grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); + if (!grpc_event_engine_can_track_errors()) { + return false; + } + struct sockaddr addr; + socklen_t len = sizeof(addr); + if (getsockname(tcp->fd, &addr, &len) < 0) { + gpr_log(GPR_ERROR, "getsockname"); + return false; + } + if (addr.sa_family == AF_INET || addr.sa_family == AF_INET6) { + return true; + } + return false; +} + static const grpc_endpoint_vtable vtable = {tcp_read, tcp_write, tcp_add_to_pollset, @@ -998,7 +1049,8 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_destroy, tcp_get_resource_user, tcp_get_peer, - tcp_get_fd}; + tcp_get_fd, + tcp_can_track_err}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 @@ -1059,6 +1111,7 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, tcp->is_first_read = true; tcp->bytes_counter = -1; tcp->socket_ts_enabled = false; + tcp->outgoing_buffer_arg = nullptr; /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); @@ -1097,12 +1150,19 @@ void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, grpc_closure* done) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = reinterpret_cast<grpc_tcp*>(ep); + gpr_log(GPR_INFO, "destroy and release %p %p", ep, tcp->outgoing_buffer_arg); GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); if (grpc_event_engine_can_track_errors()) { /* Stop errors notification. */ + gpr_mu_lock(&tcp->tb_mu); + grpc_core::TracedBuffer::Shutdown( + &tcp->tb_head, tcp->outgoing_buffer_arg, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("endpoint destroyed")); + gpr_mu_unlock(&tcp->tb_mu); + tcp->outgoing_buffer_arg = nullptr; gpr_atm_no_barrier_store(&tcp->stop_error_notification, true); grpc_fd_set_error(tcp->em_fd); } |