diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.c | 160 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.h | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_windows.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 172 |
7 files changed, 304 insertions, 52 deletions
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index c72c37e2b5..518024fcfd 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -82,7 +82,8 @@ grpc_combiner *grpc_combiner_create(void) { gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler); + GRPC_CLOSURE_INIT(&lock->offload, offload, lock, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 1f8d7eef26..593d53e4eb 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -961,6 +961,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, r = grpc_poll_function(pfds, pfd_count, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); + } + if (r < 0) { if (errno != EINTR) { work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); @@ -981,6 +985,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { + if (GRPC_TRACER_ON(grpc_polling_trace)){gpr_log(GPR_DEBUG, "%p: got_wakeup", pollset);} work_combine_error( &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd)); } @@ -988,6 +993,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (watchers[i].fd == NULL) { fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset, + pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, + (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); + } fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 7621a7fe75..0d38b2334c 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -39,6 +39,7 @@ typedef struct { grpc_closure_list elems; size_t depth; bool shutdown; + bool queued_long_job; gpr_thd_id id; } thread_state; @@ -49,6 +50,9 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; GPR_TLS_DECL(g_this_thread_state); +static grpc_tracer_flag executor_trace = + GRPC_TRACER_INITIALIZER(false, "executor"); + static void executor_thread(void *arg); static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { @@ -58,6 +62,14 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { while (c != NULL) { grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); +#endif + } #ifndef NDEBUG c->scheduled = false; #endif @@ -65,6 +77,7 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { GRPC_ERROR_UNREF(error); c = next; n++; + grpc_exec_ctx_flush(exec_ctx); } return n; @@ -119,6 +132,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { } void grpc_executor_init(grpc_exec_ctx *exec_ctx) { + grpc_register_tracer(&executor_trace); gpr_atm_no_barrier_store(&g_cur_threads, 0); grpc_executor_set_threading(exec_ctx, true); } @@ -136,60 +150,136 @@ static void executor_thread(void *arg) { size_t subtract_depth = 0; for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, + "EXECUTOR[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", + ts - g_thread_state, subtract_depth); + } gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { + ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } if (ts->shutdown) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: shutdown", + ts - g_thread_state); + } gpr_mu_unlock(&ts->mu); break; } grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: execute", + ts - g_thread_state); + } subtract_depth = run_closures(&exec_ctx, exec); - grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); } static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - if (cur_thread_count == 0) { - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); - return; - } - thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); - if (ts == NULL) { - ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; - } - gpr_mu_lock(&ts->mu); - if (grpc_closure_list_empty(ts->elems)) { - gpr_cv_signal(&ts->cv); - } - grpc_closure_list_append(&ts->elems, closure, error); - ts->depth++; - bool try_new_thread = ts->depth > MAX_DEPTH && - cur_thread_count < g_max_threads && !ts->shutdown; - gpr_mu_unlock(&ts->mu); - if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { - cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - if (cur_thread_count < g_max_threads) { - gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, - &g_thread_state[cur_thread_count], &opt); + grpc_error *error, bool is_short) { + bool retry_push; + do { + retry_push = false; + size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count == 0) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline", + closure, closure->file_created, closure->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure); +#endif + } + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + return; } - gpr_spinlock_unlock(&g_adding_thread_lock); - } + thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); + if (ts == NULL) { + ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + } + thread_state *orig_ts = ts; + + bool try_new_thread; + for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log( + GPR_DEBUG, + "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d", + closure, is_short ? "short" : "long", closure->file_created, + closure->line_created, (int)(ts - g_thread_state)); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d", + closure, is_short ? "short" : "long", + (int)(ts - g_thread_state)); +#endif + } + gpr_mu_lock(&ts->mu); + if (ts->queued_long_job) { + gpr_mu_unlock(&ts->mu); + size_t idx = (size_t)(ts - g_thread_state); + ts = &g_thread_state[(idx + 1) % cur_thread_count]; + if (ts == orig_ts) { + retry_push = true; + try_new_thread = true; + break; + } + continue; + } + if (grpc_closure_list_empty(ts->elems)) { + gpr_cv_signal(&ts->cv); + } + grpc_closure_list_append(&ts->elems, closure, error); + ts->depth++; + try_new_thread = ts->depth > MAX_DEPTH && + cur_thread_count < g_max_threads && !ts->shutdown; + if (!is_short) ts->queued_long_job = true; + gpr_mu_unlock(&ts->mu); + break; + } + if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { + cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count < g_max_threads) { + gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, + &g_thread_state[cur_thread_count], &opt); + } + gpr_spinlock_unlock(&g_adding_thread_lock); + } + } while (retry_push); } -static const grpc_closure_scheduler_vtable executor_vtable = { - executor_push, executor_push, "executor"}; -static grpc_closure_scheduler executor_scheduler = {&executor_vtable}; -grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler; +static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, true); +} + +static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, false); +} + +static const grpc_closure_scheduler_vtable executor_vtable_short = { + executor_push_short, executor_push_short, "executor"}; +static grpc_closure_scheduler executor_scheduler_short = { + &executor_vtable_short}; + +static const grpc_closure_scheduler_vtable executor_vtable_long = { + executor_push_long, executor_push_long, "executor"}; +static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long}; + +grpc_closure_scheduler *grpc_executor_scheduler( + grpc_executor_job_length length) { + return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short + : &executor_scheduler_long; +} diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index c3382a0a12..0412c02790 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -21,6 +21,11 @@ #include "src/core/lib/iomgr/closure.h" +typedef enum { + GRPC_EXECUTOR_SHORT, + GRPC_EXECUTOR_LONG +} grpc_executor_job_length; + /** Initialize the global executor. * * This mechanism is meant to outsource work (grpc_closure instances) to a @@ -28,7 +33,7 @@ * non-blocking solution available. */ void grpc_executor_init(grpc_exec_ctx *exec_ctx); -extern grpc_closure_scheduler *grpc_executor_scheduler; +grpc_closure_scheduler *grpc_executor_scheduler(grpc_executor_job_length); /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 35dedc23de..2bb00e5eed 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -176,7 +176,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_resolved_addresses **addrs) { request *r = gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler); + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 45cfd7248d..0cb0029f4e 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -159,7 +159,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_resolved_addresses **addresses) { request *r = gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler); + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index b6dcd15cb0..c6ec472392 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -42,6 +42,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -90,8 +91,8 @@ typedef struct { grpc_closure *release_fd_cb; int *release_fd; - grpc_closure read_closure; - grpc_closure write_closure; + grpc_closure read_done_closure; + grpc_closure write_done_closure; char *peer_string; @@ -99,6 +100,141 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +typedef struct backup_poller { + gpr_mu *pollset_mu; + grpc_closure run_poller; +} backup_poller; + +#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset *)((b) + 1)) + +static gpr_atm g_uncovered_notifications_pending; +static gpr_atm g_backup_poller; /* backup_poller* */ + +static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); +static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); +static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx, + void *arg /* grpc_tcp */, + grpc_error *error); + +static void done_poller(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error_ignored) { + backup_poller *p = (backup_poller *)bp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p); + } + grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p)); + gpr_free(p); +} + +static void run_poller(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error_ignored) { + backup_poller *p = (backup_poller *)bp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); + } + gpr_mu_lock(p->pollset_mu); + GRPC_LOG_IF_ERROR("backup_poller:pollset_work", + grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC))); + gpr_mu_unlock(p->pollset_mu); + if (gpr_atm_no_barrier_load(&g_backup_poller) == (gpr_atm)p) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p); + } + GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); + } else { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p); + } + grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p), + GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p, + grpc_schedule_on_exec_ctx)); + } +} + +static void drop_uncovered(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + backup_poller *p = (backup_poller *)gpr_atm_no_barrier_load(&g_backup_poller); + gpr_atm old_count = + gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count, + (int)old_count - 1); + } + if (old_count == 1) { + gpr_mu_lock(p->pollset_mu); + bool cas_ok = gpr_atm_no_barrier_cas(&g_backup_poller, (gpr_atm)p, 0); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok); + } + GRPC_LOG_IF_ERROR("backup_poller:pollset_kick", + grpc_pollset_kick(BACKUP_POLLER_POLLSET(p), NULL)); + gpr_mu_unlock(p->pollset_mu); + } +} + +static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + backup_poller *p; + gpr_atm old_count = + gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER: cover cnt %d->%d", (int)old_count, + 2 + (int)old_count); + } + if (old_count == 0) { + p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size()); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); + } + grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); + gpr_atm_no_barrier_store(&g_backup_poller, (gpr_atm)p); + GRPC_CLOSURE_SCHED( + exec_ctx, + GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), + GRPC_ERROR_NONE); + } else { + p = (backup_poller *)gpr_atm_no_barrier_load(&g_backup_poller); + GPR_ASSERT(p != NULL); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp); + } + grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd); + drop_uncovered(exec_ctx, tcp); +} + +static void notify_on_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp); + } + GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_done_closure); +} + +static void notify_on_write(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp); + } + cover_self(exec_ctx, tcp); + GRPC_CLOSURE_INIT(&tcp->write_done_closure, + tcp_drop_uncovered_then_handle_write, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_done_closure); +} + +static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error)); + } + drop_uncovered(exec_ctx, (grpc_tcp *)arg); + tcp_handle_write(exec_ctx, arg, error); +} + static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { tcp->bytes_read_this_round += (double)bytes; } @@ -214,6 +350,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, grpc_closure *cb = tcp->read_cb; if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); size_t i; const char *str = grpc_error_string(error); gpr_log(GPR_DEBUG, "read: error=%s", str); @@ -268,7 +405,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (errno == EAGAIN) { finish_estimate(tcp); /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + notify_on_read(exec_ctx, tcp); } else { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); @@ -303,7 +440,11 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { - grpc_tcp *tcp = tcpp; + grpc_tcp *tcp = (grpc_tcp *)tcpp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, + grpc_error_string(error)); + } if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); grpc_slice_buffer_reset_and_unref_internal(exec_ctx, @@ -319,9 +460,15 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { size_t target_read_size = get_target_read_size(tcp); if (tcp->incoming_buffer->length < target_read_size && tcp->incoming_buffer->count < MAX_READ_IOVEC) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp); + } grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, target_read_size, 1, tcp->incoming_buffer); } else { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp); + } tcp_do_read(exec_ctx, tcp); } } @@ -330,6 +477,9 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); + } if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); @@ -353,9 +503,9 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = false; - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + notify_on_read(exec_ctx, tcp); } else { - GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_done_closure, GRPC_ERROR_NONE); } } @@ -463,7 +613,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + notify_on_write(exec_ctx, tcp); } else { cb = tcp->write_cb; tcp->write_cb = NULL; @@ -516,7 +666,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + notify_on_write(exec_ctx, tcp); } else { if (GRPC_TRACER_ON(grpc_tcp_trace)) { const char *str = grpc_error_string(error); @@ -593,7 +743,7 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { grpc_resource_quota_unref_internal(exec_ctx, resource_quota); resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); + (grpc_resource_quota *)channel_args->args[i].value.pointer.p); } } } @@ -623,10 +773,6 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; - GRPC_CLOSURE_INIT(&tcp->read_closure, tcp_handle_read, tcp, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&tcp->write_closure, tcp_handle_write, tcp, - grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&tcp->last_read_buffer); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); grpc_resource_user_slice_allocator_init( |