diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/http/httpcli.c | 13 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 10 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.c | 160 | ||||
-rw-r--r-- | src/core/lib/iomgr/executor.h | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_posix.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_windows.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 176 | ||||
-rw-r--r-- | src/core/lib/security/transport/security_handshaker.c | 34 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 8 | ||||
-rw-r--r-- | src/core/lib/transport/transport.c | 3 |
11 files changed, 342 insertions, 76 deletions
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 77af7b7c08..84cc39604c 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -130,7 +130,7 @@ static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) { static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error) { - internal_request *req = user_data; + internal_request *req = (internal_request *)user_data; size_t i; for (i = 0; i < req->incoming.count; i++) { @@ -159,7 +159,7 @@ static void on_written(grpc_exec_ctx *exec_ctx, internal_request *req) { } static void done_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - internal_request *req = arg; + internal_request *req = (internal_request *)arg; if (error == GRPC_ERROR_NONE) { on_written(exec_ctx, req); } else { @@ -175,7 +175,7 @@ static void start_write(grpc_exec_ctx *exec_ctx, internal_request *req) { static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *ep) { - internal_request *req = arg; + internal_request *req = (internal_request *)arg; if (!ep) { next_address(exec_ctx, req, GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -189,7 +189,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - internal_request *req = arg; + internal_request *req = (internal_request *)arg; if (!req->ep) { next_address(exec_ctx, req, GRPC_ERROR_REF(error)); @@ -226,7 +226,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, } static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - internal_request *req = arg; + internal_request *req = (internal_request *)arg; if (error != GRPC_ERROR_NONE) { finish(exec_ctx, req, GRPC_ERROR_REF(error)); return; @@ -243,7 +243,8 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, gpr_timespec deadline, grpc_closure *on_done, grpc_httpcli_response *response, const char *name, grpc_slice request_text) { - internal_request *req = gpr_malloc(sizeof(internal_request)); + internal_request *req = + (internal_request *)gpr_malloc(sizeof(internal_request)); memset(req, 0, sizeof(*req)); req->request_text = request_text; grpc_http_parser_init(&req->parser, GRPC_HTTP_RESPONSE, response); diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 9b66987b68..0e496829f6 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -80,7 +80,8 @@ grpc_combiner *grpc_combiner_create(void) { gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler); + GRPC_CLOSURE_INIT(&lock->offload, offload, lock, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 9472a8e520..266c280c48 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -986,6 +986,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, r = grpc_poll_function(pfds, pfd_count, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); + } + if (r < 0) { if (errno != EINTR) { work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); @@ -1006,6 +1010,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { + if (GRPC_TRACER_ON(grpc_polling_trace)){gpr_log(GPR_DEBUG, "%p: got_wakeup", pollset);} work_combine_error( &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd)); } @@ -1013,6 +1018,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (watchers[i].fd == NULL) { fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset, + pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, + (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); + } fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 7621a7fe75..0d38b2334c 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -39,6 +39,7 @@ typedef struct { grpc_closure_list elems; size_t depth; bool shutdown; + bool queued_long_job; gpr_thd_id id; } thread_state; @@ -49,6 +50,9 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; GPR_TLS_DECL(g_this_thread_state); +static grpc_tracer_flag executor_trace = + GRPC_TRACER_INITIALIZER(false, "executor"); + static void executor_thread(void *arg); static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { @@ -58,6 +62,14 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { while (c != NULL) { grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); +#endif + } #ifndef NDEBUG c->scheduled = false; #endif @@ -65,6 +77,7 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { GRPC_ERROR_UNREF(error); c = next; n++; + grpc_exec_ctx_flush(exec_ctx); } return n; @@ -119,6 +132,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { } void grpc_executor_init(grpc_exec_ctx *exec_ctx) { + grpc_register_tracer(&executor_trace); gpr_atm_no_barrier_store(&g_cur_threads, 0); grpc_executor_set_threading(exec_ctx, true); } @@ -136,60 +150,136 @@ static void executor_thread(void *arg) { size_t subtract_depth = 0; for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, + "EXECUTOR[%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")", + ts - g_thread_state, subtract_depth); + } gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { + ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } if (ts->shutdown) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: shutdown", + ts - g_thread_state); + } gpr_mu_unlock(&ts->mu); break; } grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%" PRIdPTR "]: execute", + ts - g_thread_state); + } subtract_depth = run_closures(&exec_ctx, exec); - grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); } static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - if (cur_thread_count == 0) { - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); - return; - } - thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); - if (ts == NULL) { - ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; - } - gpr_mu_lock(&ts->mu); - if (grpc_closure_list_empty(ts->elems)) { - gpr_cv_signal(&ts->cv); - } - grpc_closure_list_append(&ts->elems, closure, error); - ts->depth++; - bool try_new_thread = ts->depth > MAX_DEPTH && - cur_thread_count < g_max_threads && !ts->shutdown; - gpr_mu_unlock(&ts->mu); - if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { - cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - if (cur_thread_count < g_max_threads) { - gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, - &g_thread_state[cur_thread_count], &opt); + grpc_error *error, bool is_short) { + bool retry_push; + do { + retry_push = false; + size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count == 0) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline", + closure, closure->file_created, closure->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure); +#endif + } + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + return; } - gpr_spinlock_unlock(&g_adding_thread_lock); - } + thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); + if (ts == NULL) { + ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + } + thread_state *orig_ts = ts; + + bool try_new_thread; + for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log( + GPR_DEBUG, + "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d", + closure, is_short ? "short" : "long", closure->file_created, + closure->line_created, (int)(ts - g_thread_state)); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d", + closure, is_short ? "short" : "long", + (int)(ts - g_thread_state)); +#endif + } + gpr_mu_lock(&ts->mu); + if (ts->queued_long_job) { + gpr_mu_unlock(&ts->mu); + size_t idx = (size_t)(ts - g_thread_state); + ts = &g_thread_state[(idx + 1) % cur_thread_count]; + if (ts == orig_ts) { + retry_push = true; + try_new_thread = true; + break; + } + continue; + } + if (grpc_closure_list_empty(ts->elems)) { + gpr_cv_signal(&ts->cv); + } + grpc_closure_list_append(&ts->elems, closure, error); + ts->depth++; + try_new_thread = ts->depth > MAX_DEPTH && + cur_thread_count < g_max_threads && !ts->shutdown; + if (!is_short) ts->queued_long_job = true; + gpr_mu_unlock(&ts->mu); + break; + } + if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { + cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count < g_max_threads) { + gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, + &g_thread_state[cur_thread_count], &opt); + } + gpr_spinlock_unlock(&g_adding_thread_lock); + } + } while (retry_push); } -static const grpc_closure_scheduler_vtable executor_vtable = { - executor_push, executor_push, "executor"}; -static grpc_closure_scheduler executor_scheduler = {&executor_vtable}; -grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler; +static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, true); +} + +static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, false); +} + +static const grpc_closure_scheduler_vtable executor_vtable_short = { + executor_push_short, executor_push_short, "executor"}; +static grpc_closure_scheduler executor_scheduler_short = { + &executor_vtable_short}; + +static const grpc_closure_scheduler_vtable executor_vtable_long = { + executor_push_long, executor_push_long, "executor"}; +static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long}; + +grpc_closure_scheduler *grpc_executor_scheduler( + grpc_executor_job_length length) { + return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short + : &executor_scheduler_long; +} diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index c3382a0a12..0412c02790 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -21,6 +21,11 @@ #include "src/core/lib/iomgr/closure.h" +typedef enum { + GRPC_EXECUTOR_SHORT, + GRPC_EXECUTOR_LONG +} grpc_executor_job_length; + /** Initialize the global executor. * * This mechanism is meant to outsource work (grpc_closure instances) to a @@ -28,7 +33,7 @@ * non-blocking solution available. */ void grpc_executor_init(grpc_exec_ctx *exec_ctx); -extern grpc_closure_scheduler *grpc_executor_scheduler; +grpc_closure_scheduler *grpc_executor_scheduler(grpc_executor_job_length); /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 35dedc23de..2bb00e5eed 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -176,7 +176,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_resolved_addresses **addrs) { request *r = gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler); + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 45cfd7248d..0cb0029f4e 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -159,7 +159,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_resolved_addresses **addresses) { request *r = gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler); + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 2f543fd8a9..1bace788c5 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -42,6 +42,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -90,8 +91,8 @@ typedef struct { grpc_closure *release_fd_cb; int *release_fd; - grpc_closure read_closure; - grpc_closure write_closure; + grpc_closure read_done_closure; + grpc_closure write_done_closure; char *peer_string; @@ -99,6 +100,145 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +typedef struct backup_poller { + gpr_mu *pollset_mu; + grpc_closure run_poller; +} backup_poller; + +#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset *)((b) + 1)) + +static gpr_atm g_uncovered_notifications_pending; +static gpr_atm g_backup_poller; /* backup_poller* */ + +static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); +static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); +static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx, + void *arg /* grpc_tcp */, + grpc_error *error); + +static void done_poller(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error_ignored) { + backup_poller *p = (backup_poller *)bp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p); + } + grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p)); + gpr_free(p); +} + +static void run_poller(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error_ignored) { + backup_poller *p = (backup_poller *)bp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); + } + gpr_mu_lock(p->pollset_mu); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN)); + GRPC_LOG_IF_ERROR("backup_poller:pollset_work", + grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, + now, deadline)); + gpr_mu_unlock(p->pollset_mu); + /* last "uncovered" notification is the ref that keeps us polling, if we get + * there try a cas to release it */ + if (gpr_atm_no_barrier_load(&g_uncovered_notifications_pending) == 1 && + gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) { + gpr_mu_lock(p->pollset_mu); + bool cas_ok = gpr_atm_no_barrier_cas(&g_backup_poller, (gpr_atm)p, 0); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok); + } + gpr_mu_unlock(p->pollset_mu); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p); + } + grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p), + GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p, + grpc_schedule_on_exec_ctx)); + } else { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p); + } + GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); + } +} + +static void drop_uncovered(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + backup_poller *p = (backup_poller *)gpr_atm_no_barrier_load(&g_backup_poller); + gpr_atm old_count = + gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count, + (int)old_count - 1); + } + GPR_ASSERT(old_count != 1); +} + +static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + backup_poller *p; + gpr_atm old_count = + gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER: cover cnt %d->%d", (int)old_count, + 2 + (int)old_count); + } + if (old_count == 0) { + p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size()); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); + } + grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); + gpr_atm_no_barrier_store(&g_backup_poller, (gpr_atm)p); + GRPC_CLOSURE_SCHED( + exec_ctx, + GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), + GRPC_ERROR_NONE); + } else { + p = (backup_poller *)gpr_atm_no_barrier_load(&g_backup_poller); + GPR_ASSERT(p != NULL); + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp); + } + grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd); + if (old_count != 0) { + drop_uncovered(exec_ctx, tcp); + } +} + +static void notify_on_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp); + } + GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_done_closure); +} + +static void notify_on_write(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp); + } + cover_self(exec_ctx, tcp); + GRPC_CLOSURE_INIT(&tcp->write_done_closure, + tcp_drop_uncovered_then_handle_write, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_done_closure); +} + +static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error)); + } + drop_uncovered(exec_ctx, (grpc_tcp *)arg); + tcp_handle_write(exec_ctx, arg, error); +} + static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { tcp->bytes_read_this_round += (double)bytes; } @@ -214,6 +354,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, grpc_closure *cb = tcp->read_cb; if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); size_t i; const char *str = grpc_error_string(error); gpr_log(GPR_DEBUG, "read: error=%s", str); @@ -268,7 +409,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (errno == EAGAIN) { finish_estimate(tcp); /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + notify_on_read(exec_ctx, tcp); } else { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); @@ -303,7 +444,11 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { - grpc_tcp *tcp = tcpp; + grpc_tcp *tcp = (grpc_tcp *)tcpp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, + grpc_error_string(error)); + } if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); grpc_slice_buffer_reset_and_unref_internal(exec_ctx, @@ -319,9 +464,15 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { size_t target_read_size = get_target_read_size(tcp); if (tcp->incoming_buffer->length < target_read_size && tcp->incoming_buffer->count < MAX_READ_IOVEC) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp); + } grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, target_read_size, 1, tcp->incoming_buffer); } else { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp); + } tcp_do_read(exec_ctx, tcp); } } @@ -330,6 +481,9 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); + } if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); @@ -353,9 +507,9 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = false; - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + notify_on_read(exec_ctx, tcp); } else { - GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_done_closure, GRPC_ERROR_NONE); } } @@ -463,7 +617,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + notify_on_write(exec_ctx, tcp); } else { cb = tcp->write_cb; tcp->write_cb = NULL; @@ -516,7 +670,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + notify_on_write(exec_ctx, tcp); } else { if (GRPC_TRACER_ON(grpc_tcp_trace)) { const char *str = grpc_error_string(error); @@ -593,7 +747,7 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { grpc_resource_quota_unref_internal(exec_ctx, resource_quota); resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); + (grpc_resource_quota *)channel_args->args[i].value.pointer.p); } } } @@ -623,10 +777,6 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; - GRPC_CLOSURE_INIT(&tcp->read_closure, tcp_handle_read, tcp, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&tcp->write_closure, tcp_handle_write, tcp, - grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&tcp->last_read_buffer); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); grpc_resource_user_slice_allocator_init( diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index fc9c9f980f..bf7a5272cb 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -127,13 +127,11 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, error); } -static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - security_handshaker *h = arg; - gpr_mu_lock(&h->mu); +static void on_peer_checked_inner(grpc_exec_ctx *exec_ctx, + security_handshaker *h, grpc_error *error) { if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked(exec_ctx, h, GRPC_ERROR_REF(error)); - goto done; + return; } // Create frame protector. tsi_frame_protector *protector; @@ -144,7 +142,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Frame protector creation failed"), result); security_handshake_failed_locked(exec_ctx, h, error); - goto done; + return; } // Get unused bytes. const unsigned char *unused_bytes = NULL; @@ -177,7 +175,13 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, // Set shutdown to true so that subsequent calls to // security_handshaker_shutdown() do nothing. h->shutdown = true; -done: +} + +static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + security_handshaker *h = (security_handshaker *)arg; + gpr_mu_lock(&h->mu); + on_peer_checked_inner(exec_ctx, h, error); gpr_mu_unlock(&h->mu); security_handshaker_unref(exec_ctx, h); } @@ -239,7 +243,7 @@ static grpc_error *on_handshake_next_done_locked( static void on_handshake_next_done_grpc_wrapper( tsi_result result, void *user_data, const unsigned char *bytes_to_send, size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result) { - security_handshaker *h = user_data; + security_handshaker *h = (security_handshaker *)user_data; // This callback will be invoked by TSI in a non-grpc thread, so it's // safe to create our own exec_ctx here. grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -281,7 +285,7 @@ static grpc_error *do_handshaker_next_locked( static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - security_handshaker *h = arg; + security_handshaker *h = (security_handshaker *)arg; gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( @@ -298,7 +302,8 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, bytes_received_size += GRPC_SLICE_LENGTH(h->args->read_buffer->slices[i]); } if (bytes_received_size > h->handshake_buffer_size) { - h->handshake_buffer = gpr_realloc(h->handshake_buffer, bytes_received_size); + h->handshake_buffer = + (uint8_t *)gpr_realloc(h->handshake_buffer, bytes_received_size); h->handshake_buffer_size = bytes_received_size; } size_t offset = 0; @@ -323,7 +328,7 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx, static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - security_handshaker *h = arg; + security_handshaker *h = (security_handshaker *)arg; gpr_mu_lock(&h->mu); if (error != GRPC_ERROR_NONE || h->shutdown) { security_handshake_failed_locked( @@ -400,14 +405,15 @@ static const grpc_handshaker_vtable security_handshaker_vtable = { static grpc_handshaker *security_handshaker_create( grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, grpc_security_connector *connector) { - security_handshaker *h = gpr_zalloc(sizeof(security_handshaker)); + security_handshaker *h = + (security_handshaker *)gpr_zalloc(sizeof(security_handshaker)); grpc_handshaker_init(&security_handshaker_vtable, &h->base); h->handshaker = handshaker; h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake"); gpr_mu_init(&h->mu); gpr_ref_init(&h->refs, 1); h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; - h->handshake_buffer = gpr_malloc(h->handshake_buffer_size); + h->handshake_buffer = (uint8_t *)gpr_malloc(h->handshake_buffer_size); GRPC_CLOSURE_INIT(&h->on_handshake_data_sent_to_peer, on_handshake_data_sent_to_peer, h, grpc_schedule_on_exec_ctx); @@ -450,7 +456,7 @@ static const grpc_handshaker_vtable fail_handshaker_vtable = { fail_handshaker_do_handshake}; static grpc_handshaker *fail_handshaker_create() { - grpc_handshaker *h = gpr_malloc(sizeof(*h)); + grpc_handshaker *h = (grpc_handshaker *)gpr_malloc(sizeof(*h)); grpc_handshaker_init(&fail_handshaker_vtable, h); return h; } diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 66dcc299aa..f8c84fb4a8 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1116,9 +1116,11 @@ void grpc_server_start(grpc_server *server) { server_ref(server); server->starting = true; - GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server, - grpc_executor_scheduler), - GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + &exec_ctx, + GRPC_CLOSURE_CREATE(start_listeners, server, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)), + GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 6c61f4b8d9..0ca7688ca4 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -72,7 +72,8 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, cope with. Throw this over to the executor (on a core-owned thread) and process it there. */ - refcount->destroy.scheduler = grpc_executor_scheduler; + refcount->destroy.scheduler = + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT); } GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE); } |