diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 16 |
1 files changed, 13 insertions, 3 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 23cbc20910..78c8d1eed8 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -204,6 +204,13 @@ static void drop_uncovered(grpc_tcp* tcp) { GPR_ASSERT(old_count != 1); } +// gRPC API considers a Write operation to be done the moment it clears ‘flow +// control’ i.e., not necessarily sent on the wire. This means that the +// application MIGHT not call `grpc_completion_queue_next/pluck` in a timely +// manner when its `Write()` API is acked. +// +// We need to ensure that the fd is 'covered' (i.e being monitored by some +// polling thread and progress is made) and hence add it to a backup poller here static void cover_self(grpc_tcp* tcp) { backup_poller* p; gpr_atm old_count = @@ -468,7 +475,9 @@ static void tcp_do_read(grpc_tcp* tcp) { GRPC_STATS_INC_TCP_READ_SIZE(read_bytes); add_to_estimate(tcp, static_cast<size_t>(read_bytes)); GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); - if (static_cast<size_t>(read_bytes) < tcp->incoming_buffer->length) { + if (static_cast<size_t>(read_bytes) == tcp->incoming_buffer->length) { + finish_estimate(tcp); + } else if (static_cast<size_t>(read_bytes) < tcp->incoming_buffer->length) { grpc_slice_buffer_trim_end( tcp->incoming_buffer, tcp->incoming_buffer->length - static_cast<size_t>(read_bytes), @@ -498,7 +507,7 @@ static void tcp_read_allocation_done(void* tcpp, grpc_error* error) { static void tcp_continue_read(grpc_tcp* tcp) { size_t target_read_size = get_target_read_size(tcp); - if (tcp->incoming_buffer->length < target_read_size && + if (tcp->incoming_buffer->length < target_read_size / 2 && tcp->incoming_buffer->count < MAX_READ_IOVEC) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_INFO, "TCP:%p alloc_slices", tcp); @@ -655,7 +664,8 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg, return cmsg; } - auto tss = reinterpret_cast<struct scm_timestamping*>(CMSG_DATA(cmsg)); + auto tss = + reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg)); auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg)); if (serr->ee_errno != ENOMSG || serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) { |