aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_posix.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc16
1 files changed, 13 insertions, 3 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 23cbc20910..78c8d1eed8 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -204,6 +204,13 @@ static void drop_uncovered(grpc_tcp* tcp) {
GPR_ASSERT(old_count != 1);
}
+// gRPC API considers a Write operation to be done the moment it clears ‘flow
+// control’ i.e., not necessarily sent on the wire. This means that the
+// application MIGHT not call `grpc_completion_queue_next/pluck` in a timely
+// manner when its `Write()` API is acked.
+//
+// We need to ensure that the fd is 'covered' (i.e being monitored by some
+// polling thread and progress is made) and hence add it to a backup poller here
static void cover_self(grpc_tcp* tcp) {
backup_poller* p;
gpr_atm old_count =
@@ -468,7 +475,9 @@ static void tcp_do_read(grpc_tcp* tcp) {
GRPC_STATS_INC_TCP_READ_SIZE(read_bytes);
add_to_estimate(tcp, static_cast<size_t>(read_bytes));
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
- if (static_cast<size_t>(read_bytes) < tcp->incoming_buffer->length) {
+ if (static_cast<size_t>(read_bytes) == tcp->incoming_buffer->length) {
+ finish_estimate(tcp);
+ } else if (static_cast<size_t>(read_bytes) < tcp->incoming_buffer->length) {
grpc_slice_buffer_trim_end(
tcp->incoming_buffer,
tcp->incoming_buffer->length - static_cast<size_t>(read_bytes),
@@ -498,7 +507,7 @@ static void tcp_read_allocation_done(void* tcpp, grpc_error* error) {
static void tcp_continue_read(grpc_tcp* tcp) {
size_t target_read_size = get_target_read_size(tcp);
- if (tcp->incoming_buffer->length < target_read_size &&
+ if (tcp->incoming_buffer->length < target_read_size / 2 &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "TCP:%p alloc_slices", tcp);
@@ -655,7 +664,8 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
return cmsg;
}
- auto tss = reinterpret_cast<struct scm_timestamping*>(CMSG_DATA(cmsg));
+ auto tss =
+ reinterpret_cast<struct grpc_core::scm_timestamping*>(CMSG_DATA(cmsg));
auto serr = reinterpret_cast<struct sock_extended_err*>(CMSG_DATA(next_cmsg));
if (serr->ee_errno != ENOMSG ||
serr->ee_origin != SO_EE_ORIGIN_TIMESTAMPING) {