diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 93 |
1 files changed, 52 insertions, 41 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index cb90933e31..d09cfca9af 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -61,7 +61,7 @@ typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type; typedef size_t msg_iovlen_type; #endif -grpc_tracer_flag grpc_tcp_trace = GRPC_TRACER_INITIALIZER(false, "tcp"); +grpc_core::TraceFlag grpc_tcp_trace(false, "tcp"); typedef struct { grpc_endpoint base; @@ -81,9 +81,7 @@ typedef struct { grpc_slice_buffer* incoming_buffer; grpc_slice_buffer* outgoing_buffer; - /** slice within outgoing_buffer to write next */ - size_t outgoing_slice_idx; - /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */ + /** byte within outgoing_buffer->slices[0] to write next */ size_t outgoing_byte_idx; grpc_closure* read_cb; @@ -121,7 +119,7 @@ static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx, static void done_poller(grpc_exec_ctx* exec_ctx, void* bp, grpc_error* error_ignored) { backup_poller* p = (backup_poller*)bp; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p); } grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p)); @@ -131,7 +129,7 @@ static void done_poller(grpc_exec_ctx* exec_ctx, void* bp, static void run_poller(grpc_exec_ctx* exec_ctx, void* bp, grpc_error* error_ignored) { backup_poller* p = (backup_poller*)bp; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); @@ -147,18 +145,18 @@ static void run_poller(grpc_exec_ctx* exec_ctx, void* bp, gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) { gpr_mu_lock(p->pollset_mu); bool cas_ok = gpr_atm_full_cas(&g_backup_poller, (gpr_atm)p, 0); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok); } gpr_mu_unlock(p->pollset_mu); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p); } grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p), GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p, grpc_schedule_on_exec_ctx)); } else { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p); } GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); @@ -169,7 +167,7 @@ static void drop_uncovered(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { backup_poller* p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller); gpr_atm old_count = gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count, (int)old_count - 1); } @@ -180,14 +178,14 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { backup_poller* p; gpr_atm old_count = gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER: cover cnt %d->%d", (int)old_count, 2 + (int)old_count); } if (old_count == 0) { GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx); p = (backup_poller*)gpr_zalloc(sizeof(*p) + grpc_pollset_size()); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); } grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); @@ -203,7 +201,7 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { // spin waiting for backup poller } } - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp); } grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd); @@ -213,7 +211,7 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { } static void notify_on_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp); } GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp, @@ -222,7 +220,7 @@ static void notify_on_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { } static void notify_on_write(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp); } cover_self(exec_ctx, tcp); @@ -234,7 +232,7 @@ static void notify_on_write(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error)); } drop_uncovered(exec_ctx, (grpc_tcp*)arg); @@ -311,7 +309,7 @@ static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, @@ -324,7 +322,7 @@ static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file, int line) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, tcp, reason, val, @@ -355,7 +353,7 @@ static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, grpc_error* error) { grpc_closure* cb = tcp->read_cb; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); size_t i; const char* str = grpc_error_string(error); @@ -451,7 +449,7 @@ static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp, grpc_error* error) { grpc_tcp* tcp = (grpc_tcp*)tcpp; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, grpc_error_string(error)); } @@ -470,13 +468,13 @@ static void tcp_continue_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { size_t target_read_size = get_target_read_size(tcp); if (tcp->incoming_buffer->length < target_read_size && tcp->incoming_buffer->count < MAX_READ_IOVEC) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp); } grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, target_read_size, 1, tcp->incoming_buffer); } else { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp); } tcp_do_read(exec_ctx, tcp); @@ -487,7 +485,7 @@ static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, grpc_error* error) { grpc_tcp* tcp = (grpc_tcp*)arg; GPR_ASSERT(!tcp->finished_edge); - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); } @@ -532,23 +530,26 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, size_t unwind_slice_idx; size_t unwind_byte_idx; + // We always start at zero, because we eagerly unref and trim the slice + // buffer as we write + size_t outgoing_slice_idx = 0; + for (;;) { sending_length = 0; - unwind_slice_idx = tcp->outgoing_slice_idx; + unwind_slice_idx = outgoing_slice_idx; unwind_byte_idx = tcp->outgoing_byte_idx; - for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count && + for (iov_size = 0; outgoing_slice_idx != tcp->outgoing_buffer->count && iov_size != MAX_WRITE_IOVEC; iov_size++) { iov[iov_size].iov_base = GRPC_SLICE_START_PTR( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) + + tcp->outgoing_buffer->slices[outgoing_slice_idx]) + tcp->outgoing_byte_idx; iov[iov_size].iov_len = - GRPC_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) - + GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]) - tcp->outgoing_byte_idx; sending_length += iov[iov_size].iov_len; - tcp->outgoing_slice_idx++; + outgoing_slice_idx++; tcp->outgoing_byte_idx = 0; } GPR_ASSERT(iov_size > 0); @@ -574,16 +575,25 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, if (sent_length < 0) { if (errno == EAGAIN) { - tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; + // unref all and forget about all slices that have been written to this + // point + for (size_t idx = 0; idx < unwind_slice_idx; ++idx) { + grpc_slice_unref_internal( + exec_ctx, grpc_slice_buffer_take_first(tcp->outgoing_buffer)); + } return false; } else if (errno == EPIPE) { *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } else { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } } @@ -593,9 +603,9 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, while (trailing > 0) { size_t slice_length; - tcp->outgoing_slice_idx--; - slice_length = GRPC_SLICE_LENGTH( - tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]); + outgoing_slice_idx--; + slice_length = + GRPC_SLICE_LENGTH(tcp->outgoing_buffer->slices[outgoing_slice_idx]); if (slice_length > trailing) { tcp->outgoing_byte_idx = slice_length - trailing; break; @@ -604,11 +614,13 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, } } - if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { + if (outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + tcp->outgoing_buffer); return true; } - }; + } } static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, @@ -625,14 +637,14 @@ static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, } if (!tcp_flush(exec_ctx, tcp, &error)) { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "write: delayed"); } notify_on_write(exec_ctx, tcp); } else { cb = tcp->write_cb; tcp->write_cb = nullptr; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: %s", str); } @@ -647,7 +659,7 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_tcp* tcp = (grpc_tcp*)ep; grpc_error* error = GRPC_ERROR_NONE; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { size_t i; for (i = 0; i < buf->count; i++) { @@ -672,18 +684,17 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, return; } tcp->outgoing_buffer = buf; - tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; if (!tcp_flush(exec_ctx, tcp, &error)) { TCP_REF(tcp, "write"); tcp->write_cb = cb; - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "write: delayed"); } notify_on_write(exec_ctx, tcp); } else { - if (GRPC_TRACER_ON(grpc_tcp_trace)) { + if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: %s", str); } |