aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/tcp_posix.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc262
1 files changed, 118 insertions, 144 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index d09cfca9af..155329d2e8 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -108,36 +108,31 @@ typedef struct backup_poller {
static gpr_atm g_uncovered_notifications_pending;
static gpr_atm g_backup_poller; /* backup_poller* */
-static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
- grpc_error* error);
-static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
- grpc_error* error);
-static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx,
- void* arg /* grpc_tcp */,
+static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error);
+static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error);
+static void tcp_drop_uncovered_then_handle_write(void* arg /* grpc_tcp */,
grpc_error* error);
-static void done_poller(grpc_exec_ctx* exec_ctx, void* bp,
- grpc_error* error_ignored) {
+static void done_poller(void* bp, grpc_error* error_ignored) {
backup_poller* p = (backup_poller*)bp;
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p);
}
- grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p));
+ grpc_pollset_destroy(BACKUP_POLLER_POLLSET(p));
gpr_free(p);
}
-static void run_poller(grpc_exec_ctx* exec_ctx, void* bp,
- grpc_error* error_ignored) {
+static void run_poller(void* bp, grpc_error* error_ignored) {
backup_poller* p = (backup_poller*)bp;
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p);
}
gpr_mu_lock(p->pollset_mu);
- grpc_millis deadline = grpc_exec_ctx_now(exec_ctx) + 13 * GPR_MS_PER_SEC;
- GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx);
+ grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 13 * GPR_MS_PER_SEC;
+ GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS();
GRPC_LOG_IF_ERROR(
"backup_poller:pollset_work",
- grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), nullptr, deadline));
+ grpc_pollset_work(BACKUP_POLLER_POLLSET(p), nullptr, deadline));
gpr_mu_unlock(p->pollset_mu);
/* last "uncovered" notification is the ref that keeps us polling, if we get
* there try a cas to release it */
@@ -152,18 +147,18 @@ static void run_poller(grpc_exec_ctx* exec_ctx, void* bp,
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p);
}
- grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p),
+ grpc_pollset_shutdown(BACKUP_POLLER_POLLSET(p),
GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p,
grpc_schedule_on_exec_ctx));
} else {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p);
}
- GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&p->run_poller, GRPC_ERROR_NONE);
}
}
-static void drop_uncovered(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
+static void drop_uncovered(grpc_tcp* tcp) {
backup_poller* p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller);
gpr_atm old_count =
gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1);
@@ -174,7 +169,7 @@ static void drop_uncovered(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
GPR_ASSERT(old_count != 1);
}
-static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
+static void cover_self(grpc_tcp* tcp) {
backup_poller* p;
gpr_atm old_count =
gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2);
@@ -183,7 +178,7 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
2 + (int)old_count);
}
if (old_count == 0) {
- GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx);
+ GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED();
p = (backup_poller*)gpr_zalloc(sizeof(*p) + grpc_pollset_size());
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p);
@@ -191,7 +186,6 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu);
gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p);
GRPC_CLOSURE_SCHED(
- exec_ctx,
GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p,
grpc_executor_scheduler(GRPC_EXECUTOR_LONG)),
GRPC_ERROR_NONE);
@@ -204,39 +198,38 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp);
}
- grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd);
+ grpc_pollset_add_fd(BACKUP_POLLER_POLLSET(p), tcp->em_fd);
if (old_count != 0) {
- drop_uncovered(exec_ctx, tcp);
+ drop_uncovered(tcp);
}
}
-static void notify_on_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
+static void notify_on_read(grpc_tcp* tcp) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp);
}
GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_done_closure);
+ grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_done_closure);
}
-static void notify_on_write(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
+static void notify_on_write(grpc_tcp* tcp) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp);
}
- cover_self(exec_ctx, tcp);
+ cover_self(tcp);
GRPC_CLOSURE_INIT(&tcp->write_done_closure,
tcp_drop_uncovered_then_handle_write, tcp,
grpc_schedule_on_exec_ctx);
- grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_done_closure);
+ grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure);
}
-static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx,
- void* arg, grpc_error* error) {
+static void tcp_drop_uncovered_then_handle_write(void* arg, grpc_error* error) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error));
}
- drop_uncovered(exec_ctx, (grpc_tcp*)arg);
- tcp_handle_write(exec_ctx, arg, error);
+ drop_uncovered((grpc_tcp*)arg);
+ tcp_handle_write(arg, error);
}
static void add_to_estimate(grpc_tcp* tcp, size_t bytes) {
@@ -282,33 +275,29 @@ static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) {
grpc_slice_from_copied_string(tcp->peer_string));
}
-static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
- grpc_error* error);
-static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
- grpc_error* error);
+static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error);
+static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error);
-static void tcp_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
- grpc_error* why) {
+static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) {
grpc_tcp* tcp = (grpc_tcp*)ep;
- grpc_fd_shutdown(exec_ctx, tcp->em_fd, why);
- grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
+ grpc_fd_shutdown(tcp->em_fd, why);
+ grpc_resource_user_shutdown(tcp->resource_user);
}
-static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
- grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
+static void tcp_free(grpc_tcp* tcp) {
+ grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
false /* already_closed */, "tcp_unref_orphan");
- grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer);
- grpc_resource_user_unref(exec_ctx, tcp->resource_user);
+ grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer);
+ grpc_resource_user_unref(tcp->resource_user);
gpr_free(tcp->peer_string);
gpr_free(tcp);
}
#ifndef NDEBUG
-#define TCP_UNREF(cl, tcp, reason) \
- tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__)
#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
-static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
- const char* reason, const char* file, int line) {
+static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file,
+ int line) {
if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
@@ -316,7 +305,7 @@ static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
val - 1);
}
if (gpr_unref(&tcp->refcount)) {
- tcp_free(exec_ctx, tcp);
+ tcp_free(tcp);
}
}
@@ -331,26 +320,25 @@ static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file,
gpr_ref(&tcp->refcount);
}
#else
-#define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
+#define TCP_UNREF(tcp, reason) tcp_unref((tcp))
#define TCP_REF(tcp, reason) tcp_ref((tcp))
-static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
+static void tcp_unref(grpc_tcp* tcp) {
if (gpr_unref(&tcp->refcount)) {
- tcp_free(exec_ctx, tcp);
+ tcp_free(tcp);
}
}
static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); }
#endif
-static void tcp_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
+static void tcp_destroy(grpc_endpoint* ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = (grpc_tcp*)ep;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer);
- TCP_UNREF(exec_ctx, tcp, "destroy");
+ grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ TCP_UNREF(tcp, "destroy");
}
-static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
- grpc_error* error) {
+static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
grpc_closure* cb = tcp->read_cb;
if (grpc_tcp_trace.enabled()) {
@@ -369,11 +357,11 @@ static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr;
- GRPC_CLOSURE_RUN(exec_ctx, cb, error);
+ GRPC_CLOSURE_RUN(cb, error);
}
#define MAX_READ_IOVEC 4
-static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
+static void tcp_do_read(grpc_tcp* tcp) {
struct msghdr msg;
struct iovec iov[MAX_READ_IOVEC];
ssize_t read_bytes;
@@ -396,12 +384,12 @@ static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
msg.msg_controllen = 0;
msg.msg_flags = 0;
- GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, tcp->incoming_buffer->length);
- GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count);
+ GRPC_STATS_INC_TCP_READ_OFFER(tcp->incoming_buffer->length);
+ GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(tcp->incoming_buffer->count);
GPR_TIMER_BEGIN("recvmsg", 0);
do {
- GRPC_STATS_INC_SYSCALL_READ(exec_ctx);
+ GRPC_STATS_INC_SYSCALL_READ();
read_bytes = recvmsg(tcp->fd, &msg, 0);
} while (read_bytes < 0 && errno == EINTR);
GPR_TIMER_END("recvmsg", read_bytes >= 0);
@@ -412,24 +400,22 @@ static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
if (errno == EAGAIN) {
finish_estimate(tcp);
/* We've consumed the edge, request a new one */
- notify_on_read(exec_ctx, tcp);
+ notify_on_read(tcp);
} else {
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
- tcp->incoming_buffer);
- call_read_cb(exec_ctx, tcp,
+ grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
+ call_read_cb(tcp,
tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp));
- TCP_UNREF(exec_ctx, tcp, "read");
+ TCP_UNREF(tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
call_read_cb(
- exec_ctx, tcp,
- tcp_annotate_error(
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
- TCP_UNREF(exec_ctx, tcp, "read");
+ tcp, tcp_annotate_error(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp));
+ TCP_UNREF(tcp, "read");
} else {
- GRPC_STATS_INC_TCP_READ_SIZE(exec_ctx, read_bytes);
+ GRPC_STATS_INC_TCP_READ_SIZE(read_bytes);
add_to_estimate(tcp, (size_t)read_bytes);
GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
if ((size_t)read_bytes < tcp->incoming_buffer->length) {
@@ -439,50 +425,47 @@ static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
&tcp->last_read_buffer);
}
GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
- call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE);
- TCP_UNREF(exec_ctx, tcp, "read");
+ call_read_cb(tcp, GRPC_ERROR_NONE);
+ TCP_UNREF(tcp, "read");
}
GPR_TIMER_END("tcp_continue_read", 0);
}
-static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp,
- grpc_error* error) {
+static void tcp_read_allocation_done(void* tcpp, grpc_error* error) {
grpc_tcp* tcp = (grpc_tcp*)tcpp;
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp,
grpc_error_string(error));
}
if (error != GRPC_ERROR_NONE) {
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
- &tcp->last_read_buffer);
- call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
- TCP_UNREF(exec_ctx, tcp, "read");
+ grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ call_read_cb(tcp, GRPC_ERROR_REF(error));
+ TCP_UNREF(tcp, "read");
} else {
- tcp_do_read(exec_ctx, tcp);
+ tcp_do_read(tcp);
}
}
-static void tcp_continue_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) {
+static void tcp_continue_read(grpc_tcp* tcp) {
size_t target_read_size = get_target_read_size(tcp);
if (tcp->incoming_buffer->length < target_read_size &&
tcp->incoming_buffer->count < MAX_READ_IOVEC) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp);
}
- grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator,
- target_read_size, 1, tcp->incoming_buffer);
+ grpc_resource_user_alloc_slices(&tcp->slice_allocator, target_read_size, 1,
+ tcp->incoming_buffer);
} else {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp);
}
- tcp_do_read(exec_ctx, tcp);
+ tcp_do_read(tcp);
}
}
-static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
- grpc_error* error) {
+static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) {
grpc_tcp* tcp = (grpc_tcp*)arg;
GPR_ASSERT(!tcp->finished_edge);
if (grpc_tcp_trace.enabled()) {
@@ -490,37 +473,35 @@ static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
}
if (error != GRPC_ERROR_NONE) {
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
- &tcp->last_read_buffer);
- call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
- TCP_UNREF(exec_ctx, tcp, "read");
+ grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ call_read_cb(tcp, GRPC_ERROR_REF(error));
+ TCP_UNREF(tcp, "read");
} else {
- tcp_continue_read(exec_ctx, tcp);
+ tcp_continue_read(tcp);
}
}
-static void tcp_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
- grpc_slice_buffer* incoming_buffer, grpc_closure* cb) {
+static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,
+ grpc_closure* cb) {
grpc_tcp* tcp = (grpc_tcp*)ep;
GPR_ASSERT(tcp->read_cb == nullptr);
tcp->read_cb = cb;
tcp->incoming_buffer = incoming_buffer;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = false;
- notify_on_read(exec_ctx, tcp);
+ notify_on_read(tcp);
} else {
- GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_done_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&tcp->read_done_closure, GRPC_ERROR_NONE);
}
}
/* returns true if done, false if pending; if returning true, *error is set */
#define MAX_WRITE_IOVEC 1000
-static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
- grpc_error** error) {
+static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) {
struct msghdr msg;
struct iovec iov[MAX_WRITE_IOVEC];
msg_iovlen_type iov_size;
@@ -562,13 +543,13 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
msg.msg_controllen = 0;
msg.msg_flags = 0;
- GRPC_STATS_INC_TCP_WRITE_SIZE(exec_ctx, sending_length);
- GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(exec_ctx, iov_size);
+ GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
+ GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
GPR_TIMER_BEGIN("sendmsg", 1);
do {
/* TODO(klempner): Cork if this is a partial write */
- GRPC_STATS_INC_SYSCALL_WRITE(exec_ctx);
+ GRPC_STATS_INC_SYSCALL_WRITE();
sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
} while (sent_length < 0 && errno == EINTR);
GPR_TIMER_END("sendmsg", 0);
@@ -580,20 +561,18 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
// point
for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
grpc_slice_unref_internal(
- exec_ctx, grpc_slice_buffer_take_first(tcp->outgoing_buffer));
+ grpc_slice_buffer_take_first(tcp->outgoing_buffer));
}
return false;
} else if (errno == EPIPE) {
*error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"),
GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
- tcp->outgoing_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
return true;
} else {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
- tcp->outgoing_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
return true;
}
}
@@ -616,31 +595,29 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp,
if (outgoing_slice_idx == tcp->outgoing_buffer->count) {
*error = GRPC_ERROR_NONE;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
- tcp->outgoing_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
return true;
}
}
}
-static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
- grpc_error* error) {
+static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
grpc_tcp* tcp = (grpc_tcp*)arg;
grpc_closure* cb;
if (error != GRPC_ERROR_NONE) {
cb = tcp->write_cb;
tcp->write_cb = nullptr;
- cb->cb(exec_ctx, cb->cb_arg, error);
- TCP_UNREF(exec_ctx, tcp, "write");
+ cb->cb(cb->cb_arg, error);
+ TCP_UNREF(tcp, "write");
return;
}
- if (!tcp_flush(exec_ctx, tcp, &error)) {
+ if (!tcp_flush(tcp, &error)) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "write: delayed");
}
- notify_on_write(exec_ctx, tcp);
+ notify_on_write(tcp);
} else {
cb = tcp->write_cb;
tcp->write_cb = nullptr;
@@ -649,13 +626,13 @@ static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */,
gpr_log(GPR_DEBUG, "write: %s", str);
}
- GRPC_CLOSURE_RUN(exec_ctx, cb, error);
- TCP_UNREF(exec_ctx, tcp, "write");
+ GRPC_CLOSURE_RUN(cb, error);
+ TCP_UNREF(tcp, "write");
}
}
-static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
- grpc_slice_buffer* buf, grpc_closure* cb) {
+static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf,
+ grpc_closure* cb) {
grpc_tcp* tcp = (grpc_tcp*)ep;
grpc_error* error = GRPC_ERROR_NONE;
@@ -676,51 +653,48 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
GRPC_CLOSURE_SCHED(
- exec_ctx, cb,
- grpc_fd_is_shutdown(tcp->em_fd)
- ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"),
- tcp)
- : GRPC_ERROR_NONE);
+ cb, grpc_fd_is_shutdown(tcp->em_fd)
+ ? tcp_annotate_error(
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp)
+ : GRPC_ERROR_NONE);
return;
}
tcp->outgoing_buffer = buf;
tcp->outgoing_byte_idx = 0;
- if (!tcp_flush(exec_ctx, tcp, &error)) {
+ if (!tcp_flush(tcp, &error)) {
TCP_REF(tcp, "write");
tcp->write_cb = cb;
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "write: delayed");
}
- notify_on_write(exec_ctx, tcp);
+ notify_on_write(tcp);
} else {
if (grpc_tcp_trace.enabled()) {
const char* str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "write: %s", str);
}
- GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(cb, error);
}
GPR_TIMER_END("tcp_write", 0);
}
-static void tcp_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
- grpc_pollset* pollset) {
+static void tcp_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) {
grpc_tcp* tcp = (grpc_tcp*)ep;
- grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
+ grpc_pollset_add_fd(pollset, tcp->em_fd);
}
-static void tcp_add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+static void tcp_add_to_pollset_set(grpc_endpoint* ep,
grpc_pollset_set* pollset_set) {
grpc_tcp* tcp = (grpc_tcp*)ep;
- grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
+ grpc_pollset_set_add_fd(pollset_set, tcp->em_fd);
}
-static void tcp_delete_from_pollset_set(grpc_exec_ctx* exec_ctx,
- grpc_endpoint* ep,
+static void tcp_delete_from_pollset_set(grpc_endpoint* ep,
grpc_pollset_set* pollset_set) {
grpc_tcp* tcp = (grpc_tcp*)ep;
- grpc_pollset_set_del_fd(exec_ctx, pollset_set, tcp->em_fd);
+ grpc_pollset_set_del_fd(pollset_set, tcp->em_fd);
}
static char* tcp_get_peer(grpc_endpoint* ep) {
@@ -751,7 +725,7 @@ static const grpc_endpoint_vtable vtable = {tcp_read,
#define MAX_CHUNK_SIZE 32 * 1024 * 1024
-grpc_endpoint* grpc_tcp_create(grpc_exec_ctx* exec_ctx, grpc_fd* em_fd,
+grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
const grpc_channel_args* channel_args,
const char* peer_string) {
int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE;
@@ -780,7 +754,7 @@ grpc_endpoint* grpc_tcp_create(grpc_exec_ctx* exec_ctx, grpc_fd* em_fd,
grpc_channel_arg_get_integer(&channel_args->args[i], options);
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(resource_quota);
resource_quota = grpc_resource_quota_ref_internal(
(grpc_resource_quota*)channel_args->args[i].value.pointer.p);
}
@@ -817,7 +791,7 @@ grpc_endpoint* grpc_tcp_create(grpc_exec_ctx* exec_ctx, grpc_fd* em_fd,
&tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp);
/* Tell network status tracker about new endpoint */
grpc_network_status_register_endpoint(&tcp->base);
- grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(resource_quota);
return &tcp->base;
}
@@ -828,15 +802,15 @@ int grpc_tcp_fd(grpc_endpoint* ep) {
return grpc_fd_wrapped_fd(tcp->em_fd);
}
-void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
- int* fd, grpc_closure* done) {
+void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd,
+ grpc_closure* done) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp* tcp = (grpc_tcp*)ep;
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;
tcp->release_fd_cb = done;
- grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer);
- TCP_UNREF(exec_ctx, tcp, "destroy");
+ grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer);
+ TCP_UNREF(tcp, "destroy");
}
#endif