diff options
Diffstat (limited to 'src/core/lib/iomgr/tcp_posix.cc')
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 262 |
1 files changed, 118 insertions, 144 deletions
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index d09cfca9af..155329d2e8 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -108,36 +108,31 @@ typedef struct backup_poller { static gpr_atm g_uncovered_notifications_pending; static gpr_atm g_backup_poller; /* backup_poller* */ -static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, - grpc_error* error); -static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, - grpc_error* error); -static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx, - void* arg /* grpc_tcp */, +static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error); +static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error); +static void tcp_drop_uncovered_then_handle_write(void* arg /* grpc_tcp */, grpc_error* error); -static void done_poller(grpc_exec_ctx* exec_ctx, void* bp, - grpc_error* error_ignored) { +static void done_poller(void* bp, grpc_error* error_ignored) { backup_poller* p = (backup_poller*)bp; if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p); } - grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p)); + grpc_pollset_destroy(BACKUP_POLLER_POLLSET(p)); gpr_free(p); } -static void run_poller(grpc_exec_ctx* exec_ctx, void* bp, - grpc_error* error_ignored) { +static void run_poller(void* bp, grpc_error* error_ignored) { backup_poller* p = (backup_poller*)bp; if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); } gpr_mu_lock(p->pollset_mu); - grpc_millis deadline = grpc_exec_ctx_now(exec_ctx) + 13 * GPR_MS_PER_SEC; - GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx); + grpc_millis deadline = grpc_core::ExecCtx::Get()->Now() + 13 * GPR_MS_PER_SEC; + GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(); GRPC_LOG_IF_ERROR( "backup_poller:pollset_work", - grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), nullptr, deadline)); + grpc_pollset_work(BACKUP_POLLER_POLLSET(p), nullptr, deadline)); gpr_mu_unlock(p->pollset_mu); /* last "uncovered" notification is the ref that keeps us polling, if we get * there try a cas to release it */ @@ -152,18 +147,18 @@ static void run_poller(grpc_exec_ctx* exec_ctx, void* bp, if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p); } - grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p), + grpc_pollset_shutdown(BACKUP_POLLER_POLLSET(p), GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p, grpc_schedule_on_exec_ctx)); } else { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p); } - GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&p->run_poller, GRPC_ERROR_NONE); } } -static void drop_uncovered(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { +static void drop_uncovered(grpc_tcp* tcp) { backup_poller* p = (backup_poller*)gpr_atm_acq_load(&g_backup_poller); gpr_atm old_count = gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1); @@ -174,7 +169,7 @@ static void drop_uncovered(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { GPR_ASSERT(old_count != 1); } -static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { +static void cover_self(grpc_tcp* tcp) { backup_poller* p; gpr_atm old_count = gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2); @@ -183,7 +178,7 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { 2 + (int)old_count); } if (old_count == 0) { - GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx); + GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(); p = (backup_poller*)gpr_zalloc(sizeof(*p) + grpc_pollset_size()); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); @@ -191,7 +186,6 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p); GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), GRPC_ERROR_NONE); @@ -204,39 +198,38 @@ static void cover_self(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp); } - grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd); + grpc_pollset_add_fd(BACKUP_POLLER_POLLSET(p), tcp->em_fd); if (old_count != 0) { - drop_uncovered(exec_ctx, tcp); + drop_uncovered(tcp); } } -static void notify_on_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { +static void notify_on_read(grpc_tcp* tcp) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp); } GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_done_closure); + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_done_closure); } -static void notify_on_write(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { +static void notify_on_write(grpc_tcp* tcp) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp); } - cover_self(exec_ctx, tcp); + cover_self(tcp); GRPC_CLOSURE_INIT(&tcp->write_done_closure, tcp_drop_uncovered_then_handle_write, tcp, grpc_schedule_on_exec_ctx); - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_done_closure); + grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure); } -static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx* exec_ctx, - void* arg, grpc_error* error) { +static void tcp_drop_uncovered_then_handle_write(void* arg, grpc_error* error) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error)); } - drop_uncovered(exec_ctx, (grpc_tcp*)arg); - tcp_handle_write(exec_ctx, arg, error); + drop_uncovered((grpc_tcp*)arg); + tcp_handle_write(arg, error); } static void add_to_estimate(grpc_tcp* tcp, size_t bytes) { @@ -282,33 +275,29 @@ static grpc_error* tcp_annotate_error(grpc_error* src_error, grpc_tcp* tcp) { grpc_slice_from_copied_string(tcp->peer_string)); } -static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, - grpc_error* error); -static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, - grpc_error* error); +static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error); +static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error); -static void tcp_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_error* why) { +static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) { grpc_tcp* tcp = (grpc_tcp*)ep; - grpc_fd_shutdown(exec_ctx, tcp->em_fd, why); - grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); + grpc_fd_shutdown(tcp->em_fd, why); + grpc_resource_user_shutdown(tcp->resource_user); } -static void tcp_free(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { - grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, +static void tcp_free(grpc_tcp* tcp) { + grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, false /* already_closed */, "tcp_unref_orphan"); - grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer); - grpc_resource_user_unref(exec_ctx, tcp->resource_user); + grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); + grpc_resource_user_unref(tcp->resource_user); gpr_free(tcp->peer_string); gpr_free(tcp); } #ifndef NDEBUG -#define TCP_UNREF(cl, tcp, reason) \ - tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__) +#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, - const char* reason, const char* file, int line) { +static void tcp_unref(grpc_tcp* tcp, const char* reason, const char* file, + int line) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&tcp->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, @@ -316,7 +305,7 @@ static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, val - 1); } if (gpr_unref(&tcp->refcount)) { - tcp_free(exec_ctx, tcp); + tcp_free(tcp); } } @@ -331,26 +320,25 @@ static void tcp_ref(grpc_tcp* tcp, const char* reason, const char* file, gpr_ref(&tcp->refcount); } #else -#define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp)) +#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { +static void tcp_unref(grpc_tcp* tcp) { if (gpr_unref(&tcp->refcount)) { - tcp_free(exec_ctx, tcp); + tcp_free(tcp); } } static void tcp_ref(grpc_tcp* tcp) { gpr_ref(&tcp->refcount); } #endif -static void tcp_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { +static void tcp_destroy(grpc_endpoint* ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = (grpc_tcp*)ep; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer); - TCP_UNREF(exec_ctx, tcp, "destroy"); + grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + TCP_UNREF(tcp, "destroy"); } -static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, - grpc_error* error) { +static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { grpc_closure* cb = tcp->read_cb; if (grpc_tcp_trace.enabled()) { @@ -369,11 +357,11 @@ static void call_read_cb(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; - GRPC_CLOSURE_RUN(exec_ctx, cb, error); + GRPC_CLOSURE_RUN(cb, error); } #define MAX_READ_IOVEC 4 -static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { +static void tcp_do_read(grpc_tcp* tcp) { struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; @@ -396,12 +384,12 @@ static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_STATS_INC_TCP_READ_OFFER(exec_ctx, tcp->incoming_buffer->length); - GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(exec_ctx, tcp->incoming_buffer->count); + GRPC_STATS_INC_TCP_READ_OFFER(tcp->incoming_buffer->length); + GRPC_STATS_INC_TCP_READ_OFFER_IOV_SIZE(tcp->incoming_buffer->count); GPR_TIMER_BEGIN("recvmsg", 0); do { - GRPC_STATS_INC_SYSCALL_READ(exec_ctx); + GRPC_STATS_INC_SYSCALL_READ(); read_bytes = recvmsg(tcp->fd, &msg, 0); } while (read_bytes < 0 && errno == EINTR); GPR_TIMER_END("recvmsg", read_bytes >= 0); @@ -412,24 +400,22 @@ static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { if (errno == EAGAIN) { finish_estimate(tcp); /* We've consumed the edge, request a new one */ - notify_on_read(exec_ctx, tcp); + notify_on_read(tcp); } else { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, - tcp->incoming_buffer); - call_read_cb(exec_ctx, tcp, + grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); + call_read_cb(tcp, tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp)); - TCP_UNREF(exec_ctx, tcp, "read"); + TCP_UNREF(tcp, "read"); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); call_read_cb( - exec_ctx, tcp, - tcp_annotate_error( - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); - TCP_UNREF(exec_ctx, tcp, "read"); + tcp, tcp_annotate_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); + TCP_UNREF(tcp, "read"); } else { - GRPC_STATS_INC_TCP_READ_SIZE(exec_ctx, read_bytes); + GRPC_STATS_INC_TCP_READ_SIZE(read_bytes); add_to_estimate(tcp, (size_t)read_bytes); GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { @@ -439,50 +425,47 @@ static void tcp_do_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { &tcp->last_read_buffer); } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); - call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE); - TCP_UNREF(exec_ctx, tcp, "read"); + call_read_cb(tcp, GRPC_ERROR_NONE); + TCP_UNREF(tcp, "read"); } GPR_TIMER_END("tcp_continue_read", 0); } -static void tcp_read_allocation_done(grpc_exec_ctx* exec_ctx, void* tcpp, - grpc_error* error) { +static void tcp_read_allocation_done(void* tcpp, grpc_error* error) { grpc_tcp* tcp = (grpc_tcp*)tcpp; if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, grpc_error_string(error)); } if (error != GRPC_ERROR_NONE) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, - &tcp->last_read_buffer); - call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); - TCP_UNREF(exec_ctx, tcp, "read"); + grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + call_read_cb(tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(tcp, "read"); } else { - tcp_do_read(exec_ctx, tcp); + tcp_do_read(tcp); } } -static void tcp_continue_read(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp) { +static void tcp_continue_read(grpc_tcp* tcp) { size_t target_read_size = get_target_read_size(tcp); if (tcp->incoming_buffer->length < target_read_size && tcp->incoming_buffer->count < MAX_READ_IOVEC) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp); } - grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, - target_read_size, 1, tcp->incoming_buffer); + grpc_resource_user_alloc_slices(&tcp->slice_allocator, target_read_size, 1, + tcp->incoming_buffer); } else { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp); } - tcp_do_read(exec_ctx, tcp); + tcp_do_read(tcp); } } -static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, - grpc_error* error) { +static void tcp_handle_read(void* arg /* grpc_tcp */, grpc_error* error) { grpc_tcp* tcp = (grpc_tcp*)arg; GPR_ASSERT(!tcp->finished_edge); if (grpc_tcp_trace.enabled()) { @@ -490,37 +473,35 @@ static void tcp_handle_read(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, } if (error != GRPC_ERROR_NONE) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, - &tcp->last_read_buffer); - call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error)); - TCP_UNREF(exec_ctx, tcp, "read"); + grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer); + grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + call_read_cb(tcp, GRPC_ERROR_REF(error)); + TCP_UNREF(tcp, "read"); } else { - tcp_continue_read(exec_ctx, tcp); + tcp_continue_read(tcp); } } -static void tcp_read(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* incoming_buffer, grpc_closure* cb) { +static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer, + grpc_closure* cb) { grpc_tcp* tcp = (grpc_tcp*)ep; GPR_ASSERT(tcp->read_cb == nullptr); tcp->read_cb = cb; tcp->incoming_buffer = incoming_buffer; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, incoming_buffer); + grpc_slice_buffer_reset_and_unref_internal(incoming_buffer); grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer); TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = false; - notify_on_read(exec_ctx, tcp); + notify_on_read(tcp); } else { - GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_done_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&tcp->read_done_closure, GRPC_ERROR_NONE); } } /* returns true if done, false if pending; if returning true, *error is set */ #define MAX_WRITE_IOVEC 1000 -static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, - grpc_error** error) { +static bool tcp_flush(grpc_tcp* tcp, grpc_error** error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -562,13 +543,13 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, msg.msg_controllen = 0; msg.msg_flags = 0; - GRPC_STATS_INC_TCP_WRITE_SIZE(exec_ctx, sending_length); - GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(exec_ctx, iov_size); + GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length); + GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size); GPR_TIMER_BEGIN("sendmsg", 1); do { /* TODO(klempner): Cork if this is a partial write */ - GRPC_STATS_INC_SYSCALL_WRITE(exec_ctx); + GRPC_STATS_INC_SYSCALL_WRITE(); sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS); } while (sent_length < 0 && errno == EINTR); GPR_TIMER_END("sendmsg", 0); @@ -580,20 +561,18 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, // point for (size_t idx = 0; idx < unwind_slice_idx; ++idx) { grpc_slice_unref_internal( - exec_ctx, grpc_slice_buffer_take_first(tcp->outgoing_buffer)); + grpc_slice_buffer_take_first(tcp->outgoing_buffer)); } return false; } else if (errno == EPIPE) { *error = grpc_error_set_int(GRPC_OS_ERROR(errno, "sendmsg"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, - tcp->outgoing_buffer); + grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); return true; } else { *error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp); - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, - tcp->outgoing_buffer); + grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); return true; } } @@ -616,31 +595,29 @@ static bool tcp_flush(grpc_exec_ctx* exec_ctx, grpc_tcp* tcp, if (outgoing_slice_idx == tcp->outgoing_buffer->count) { *error = GRPC_ERROR_NONE; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, - tcp->outgoing_buffer); + grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer); return true; } } } -static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, - grpc_error* error) { +static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { grpc_tcp* tcp = (grpc_tcp*)arg; grpc_closure* cb; if (error != GRPC_ERROR_NONE) { cb = tcp->write_cb; tcp->write_cb = nullptr; - cb->cb(exec_ctx, cb->cb_arg, error); - TCP_UNREF(exec_ctx, tcp, "write"); + cb->cb(cb->cb_arg, error); + TCP_UNREF(tcp, "write"); return; } - if (!tcp_flush(exec_ctx, tcp, &error)) { + if (!tcp_flush(tcp, &error)) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "write: delayed"); } - notify_on_write(exec_ctx, tcp); + notify_on_write(tcp); } else { cb = tcp->write_cb; tcp->write_cb = nullptr; @@ -649,13 +626,13 @@ static void tcp_handle_write(grpc_exec_ctx* exec_ctx, void* arg /* grpc_tcp */, gpr_log(GPR_DEBUG, "write: %s", str); } - GRPC_CLOSURE_RUN(exec_ctx, cb, error); - TCP_UNREF(exec_ctx, tcp, "write"); + GRPC_CLOSURE_RUN(cb, error); + TCP_UNREF(tcp, "write"); } } -static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_slice_buffer* buf, grpc_closure* cb) { +static void tcp_write(grpc_endpoint* ep, grpc_slice_buffer* buf, + grpc_closure* cb) { grpc_tcp* tcp = (grpc_tcp*)ep; grpc_error* error = GRPC_ERROR_NONE; @@ -676,51 +653,48 @@ static void tcp_write(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); GRPC_CLOSURE_SCHED( - exec_ctx, cb, - grpc_fd_is_shutdown(tcp->em_fd) - ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), - tcp) - : GRPC_ERROR_NONE); + cb, grpc_fd_is_shutdown(tcp->em_fd) + ? tcp_annotate_error( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), tcp) + : GRPC_ERROR_NONE); return; } tcp->outgoing_buffer = buf; tcp->outgoing_byte_idx = 0; - if (!tcp_flush(exec_ctx, tcp, &error)) { + if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); tcp->write_cb = cb; if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "write: delayed"); } - notify_on_write(exec_ctx, tcp); + notify_on_write(tcp); } else { if (grpc_tcp_trace.enabled()) { const char* str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: %s", str); } - GRPC_CLOSURE_SCHED(exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(cb, error); } GPR_TIMER_END("tcp_write", 0); } -static void tcp_add_to_pollset(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - grpc_pollset* pollset) { +static void tcp_add_to_pollset(grpc_endpoint* ep, grpc_pollset* pollset) { grpc_tcp* tcp = (grpc_tcp*)ep; - grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd); + grpc_pollset_add_fd(pollset, tcp->em_fd); } -static void tcp_add_to_pollset_set(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, +static void tcp_add_to_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset_set) { grpc_tcp* tcp = (grpc_tcp*)ep; - grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd); + grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); } -static void tcp_delete_from_pollset_set(grpc_exec_ctx* exec_ctx, - grpc_endpoint* ep, +static void tcp_delete_from_pollset_set(grpc_endpoint* ep, grpc_pollset_set* pollset_set) { grpc_tcp* tcp = (grpc_tcp*)ep; - grpc_pollset_set_del_fd(exec_ctx, pollset_set, tcp->em_fd); + grpc_pollset_set_del_fd(pollset_set, tcp->em_fd); } static char* tcp_get_peer(grpc_endpoint* ep) { @@ -751,7 +725,7 @@ static const grpc_endpoint_vtable vtable = {tcp_read, #define MAX_CHUNK_SIZE 32 * 1024 * 1024 -grpc_endpoint* grpc_tcp_create(grpc_exec_ctx* exec_ctx, grpc_fd* em_fd, +grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd, const grpc_channel_args* channel_args, const char* peer_string) { int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; @@ -780,7 +754,7 @@ grpc_endpoint* grpc_tcp_create(grpc_exec_ctx* exec_ctx, grpc_fd* em_fd, grpc_channel_arg_get_integer(&channel_args->args[i], options); } else if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + grpc_resource_quota_unref_internal(resource_quota); resource_quota = grpc_resource_quota_ref_internal( (grpc_resource_quota*)channel_args->args[i].value.pointer.p); } @@ -817,7 +791,7 @@ grpc_endpoint* grpc_tcp_create(grpc_exec_ctx* exec_ctx, grpc_fd* em_fd, &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + grpc_resource_quota_unref_internal(resource_quota); return &tcp->base; } @@ -828,15 +802,15 @@ int grpc_tcp_fd(grpc_endpoint* ep) { return grpc_fd_wrapped_fd(tcp->em_fd); } -void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, - int* fd, grpc_closure* done) { +void grpc_tcp_destroy_and_release_fd(grpc_endpoint* ep, int* fd, + grpc_closure* done) { grpc_network_status_unregister_endpoint(ep); grpc_tcp* tcp = (grpc_tcp*)ep; GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer); - TCP_UNREF(exec_ctx, tcp, "destroy"); + grpc_slice_buffer_reset_and_unref_internal(&tcp->last_read_buffer); + TCP_UNREF(tcp, "destroy"); } #endif |