diff options
Diffstat (limited to 'src/core/lib/iomgr')
23 files changed, 448 insertions, 155 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 26f9cbe0fa..7236e23cf7 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -109,7 +109,7 @@ typedef struct { static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - wrapped_closure *wc = arg; + wrapped_closure *wc = (wrapped_closure *)arg; grpc_iomgr_cb_func cb = wc->cb; void *cb_arg = wc->cb_arg; gpr_free(wc); @@ -124,7 +124,7 @@ grpc_closure *grpc_closure_create(const char *file, int line, grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg, grpc_closure_scheduler *scheduler) { #endif - wrapped_closure *wc = gpr_malloc(sizeof(*wc)); + wrapped_closure *wc = (wrapped_closure *)gpr_malloc(sizeof(*wc)); wc->cb = cb; wc->cb_arg = cb_arg; #ifndef NDEBUG diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index 4c1503bddb..360967f3ba 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -74,14 +74,15 @@ static const grpc_closure_scheduler_vtable finally_scheduler = { static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); grpc_combiner *grpc_combiner_create(void) { - grpc_combiner *lock = gpr_zalloc(sizeof(*lock)); + grpc_combiner *lock = (grpc_combiner *)gpr_zalloc(sizeof(*lock)); gpr_ref_init(&lock->refs, 1); lock->scheduler.vtable = &scheduler; lock->finally_scheduler.vtable = &finally_scheduler; gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler); + GRPC_CLOSURE_INIT(&lock->offload, offload, lock, + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } @@ -193,7 +194,7 @@ static void move_next(grpc_exec_ctx *exec_ctx) { } static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_combiner *lock = arg; + grpc_combiner *lock = (grpc_combiner *)arg; push_last_on_exec_ctx(exec_ctx, lock); } diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 3759dda992..dcd175a2e1 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -211,7 +211,7 @@ static uint8_t get_placement(grpc_error **err, size_t size) { #ifndef NDEBUG grpc_error *orig = *err; #endif - *err = gpr_realloc( + *err = (grpc_error *)gpr_realloc( *err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t)); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_error_refcount)) { @@ -406,7 +406,8 @@ static grpc_error *copy_error_and_unref(grpc_error *in) { if (in->arena_capacity - in->arena_size < (uint8_t)SLOTS_PER_STR) { new_arena_capacity = (uint8_t)(3 * new_arena_capacity / 2); } - out = gpr_malloc(sizeof(*in) + new_arena_capacity * sizeof(intptr_t)); + out = (grpc_error *)gpr_malloc(sizeof(*in) + + new_arena_capacity * sizeof(intptr_t)); #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_error_refcount)) { gpr_log(GPR_DEBUG, "%p create copying %p", out, in); @@ -530,7 +531,7 @@ typedef struct { static void append_chr(char c, char **s, size_t *sz, size_t *cap) { if (*sz == *cap) { *cap = GPR_MAX(8, 3 * *cap / 2); - *s = gpr_realloc(*s, *cap); + *s = (char *)gpr_realloc(*s, *cap); } (*s)[(*sz)++] = c; } @@ -582,7 +583,8 @@ static void append_esc_str(const uint8_t *str, size_t len, char **s, size_t *sz, static void append_kv(kv_pairs *kvs, char *key, char *value) { if (kvs->num_kvs == kvs->cap_kvs) { kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4); - kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs); + kvs->kvs = + (kv_pair *)gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs); } kvs->kvs[kvs->num_kvs].key = key; kvs->kvs[kvs->num_kvs].value = value; @@ -695,8 +697,8 @@ static char *errs_string(grpc_error *err) { } static int cmp_kvs(const void *a, const void *b) { - const kv_pair *ka = a; - const kv_pair *kb = b; + const kv_pair *ka = (const kv_pair *)a; + const kv_pair *kb = (const kv_pair *)b; return strcmp(ka->key, kb->key); } diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index b76eb9e1c9..5bc7e878de 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -260,7 +260,7 @@ static grpc_fd *fd_create(int fd, const char *name) { gpr_mu_unlock(&fd_freelist_mu); if (new_fd == NULL) { - new_fd = gpr_malloc(sizeof(grpc_fd)); + new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } new_fd->fd = fd; @@ -442,8 +442,8 @@ static grpc_error *pollset_global_init(void) { return GRPC_OS_ERROR(errno, "epoll_ctl"); } g_num_neighbourhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBOURHOODS); - g_neighbourhoods = - gpr_zalloc(sizeof(*g_neighbourhoods) * g_num_neighbourhoods); + g_neighbourhoods = (pollset_neighbourhood *)gpr_zalloc( + sizeof(*g_neighbourhoods) * g_num_neighbourhoods); for (size_t i = 0; i < g_num_neighbourhoods; i++) { gpr_mu_init(&g_neighbourhoods[i].mu); } diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index 0e42f76af3..277347ac70 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -279,7 +279,7 @@ static void ref_by(grpc_fd *fd, int n) { } static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_fd *fd = arg; + grpc_fd *fd = (grpc_fd *)arg; /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); pollable_destroy(&fd->pollable); @@ -340,7 +340,7 @@ static grpc_fd *fd_create(int fd, const char *name) { gpr_mu_unlock(&fd_freelist_mu); if (new_fd == NULL) { - new_fd = gpr_malloc(sizeof(grpc_fd)); + new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); } pollable_init(&new_fd->pollable, PO_FD); @@ -556,7 +556,7 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, static void do_kick_all(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_unused) { grpc_error *error = GRPC_ERROR_NONE; - grpc_pollset *pollset = arg; + grpc_pollset *pollset = (grpc_pollset *)arg; gpr_mu_lock(&pollset->pollable.po.mu); if (pollset->root_worker != NULL) { grpc_pollset_worker *worker = pollset->root_worker; @@ -1012,7 +1012,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void unref_fd_no_longer_poller(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_fd *fd = arg; + grpc_fd *fd = (grpc_fd *)arg; UNREF_BY(exec_ctx, fd, 2, "pollset_pollable"); } @@ -1081,7 +1081,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, */ static grpc_pollset_set *pollset_set_create(void) { - grpc_pollset_set *pss = gpr_zalloc(sizeof(*pss)); + grpc_pollset_set *pss = (grpc_pollset_set *)gpr_zalloc(sizeof(*pss)); po_init(&pss->po, PO_POLLSET_SET); return pss; } @@ -1243,7 +1243,7 @@ static void pg_broadcast(grpc_exec_ctx *exec_ctx, polling_group *from, static void pg_create(grpc_exec_ctx *exec_ctx, polling_obj **initial_po, size_t initial_po_count) { /* assumes all polling objects in initial_po are locked */ - polling_group *pg = gpr_malloc(sizeof(*pg)); + polling_group *pg = (polling_group *)gpr_malloc(sizeof(*pg)); po_init(&pg->po, PO_POLLING_GROUP); gpr_ref_init(&pg->refs, (int)initial_po_count); for (size_t i = 0; i < initial_po_count; i++) { @@ -1353,7 +1353,7 @@ static void pg_merge(grpc_exec_ctx *exec_ctx, polling_group *a, gpr_mu_lock(&po->mu); if (unref_count == unref_cap) { unref_cap = GPR_MAX(8, 3 * unref_cap / 2); - unref = gpr_realloc(unref, unref_cap * sizeof(*unref)); + unref = (polling_group **)gpr_realloc(unref, unref_cap * sizeof(*unref)); } unref[unref_count++] = po->group; po->group = pg_ref(a); diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 59c7cdc285..b88c3ba111 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -363,7 +363,8 @@ static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds, if (pi->fd_cnt == pi->fd_capacity) { pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2); - pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity); + pi->fds = + (grpc_fd **)gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity); } pi->fds[pi->fd_cnt++] = fds[i]; @@ -466,7 +467,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, *error = GRPC_ERROR_NONE; - pi = gpr_malloc(sizeof(*pi)); + pi = (polling_island *)gpr_malloc(sizeof(*pi)); gpr_mu_init(&pi->mu); pi->fd_cnt = 0; pi->fd_capacity = 0; @@ -810,7 +811,7 @@ static grpc_fd *fd_create(int fd, const char *name) { gpr_mu_unlock(&fd_freelist_mu); if (new_fd == NULL) { - new_fd = gpr_malloc(sizeof(grpc_fd)); + new_fd = (grpc_fd *)gpr_malloc(sizeof(grpc_fd)); gpr_mu_init(&new_fd->po.mu); } @@ -1273,7 +1274,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, to the function pollset_work_and_unlock() will pick up the correct epoll_fd */ } else { - grpc_fd *fd = data_ptr; + grpc_fd *fd = (grpc_fd *)data_ptr; int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); int write_ev = ep_ev[i].events & EPOLLOUT; @@ -1569,7 +1570,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, */ static grpc_pollset_set *pollset_set_create(void) { - grpc_pollset_set *pss = gpr_malloc(sizeof(*pss)); + grpc_pollset_set *pss = (grpc_pollset_set *)gpr_malloc(sizeof(*pss)); gpr_mu_init(&pss->po.mu); pss->po.pi = NULL; #ifndef NDEBUG @@ -1647,8 +1648,8 @@ void *grpc_pollset_get_polling_island(grpc_pollset *ps) { } bool grpc_are_polling_islands_equal(void *p, void *q) { - polling_island *p1 = p; - polling_island *p2 = q; + polling_island *p1 = (polling_island *)p; + polling_island *p2 = (polling_island *)q; /* Note: polling_island_lock_pair() may change p1 and p2 to point to the latest polling islands in their respective linked lists */ diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index fbd265f3ce..bcf1d9001b 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -327,7 +327,7 @@ static void unref_by(grpc_fd *fd, int n) { } static grpc_fd *fd_create(int fd, const char *name) { - grpc_fd *r = gpr_malloc(sizeof(*r)); + grpc_fd *r = (grpc_fd *)gpr_malloc(sizeof(*r)); gpr_mu_init(&r->mu); gpr_atm_rel_store(&r->refst, 1); r->shutdown = 0; @@ -842,8 +842,8 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->fd_count == pollset->fd_capacity) { pollset->fd_capacity = GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2); - pollset->fds = - gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity); + pollset->fds = (grpc_fd **)gpr_realloc( + pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity); } pollset->fds[pollset->fd_count++] = fd; GRPC_FD_REF(fd, "multipoller"); @@ -895,7 +895,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker.wakeup_fd = pollset->local_wakeup_cache; pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { - worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); + worker.wakeup_fd = + (grpc_cached_wakeup_fd *)gpr_malloc(sizeof(*worker.wakeup_fd)); error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd); if (error != GRPC_ERROR_NONE) { GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error)); @@ -950,8 +951,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2); const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2); void *buf = gpr_malloc(pfd_size + watch_size); - pfds = buf; - watchers = (void *)((char *)buf + pfd_size); + pfds = (struct pollfd *)buf; + watchers = (grpc_fd_watcher *)(void *)((char *)buf + pfd_size); } fd_count = 0; @@ -988,6 +989,10 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, r = grpc_poll_function(pfds, pfd_count, timeout); GRPC_SCHEDULING_END_BLOCKING_REGION; + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p poll=%d", pollset, r); + } + if (r < 0) { if (errno != EINTR) { work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); @@ -1008,6 +1013,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } else { if (pfds[0].revents & POLLIN_CHECK) { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p: got_wakeup", pollset); + } work_combine_error( &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd)); } @@ -1015,6 +1023,11 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (watchers[i].fd == NULL) { fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); } else { + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_log(GPR_DEBUG, "%p got_event: %d r:%d w:%d [%d]", pollset, + pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, + (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); + } fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } @@ -1131,7 +1144,8 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, */ static grpc_pollset_set *pollset_set_create(void) { - grpc_pollset_set *pollset_set = gpr_zalloc(sizeof(*pollset_set)); + grpc_pollset_set *pollset_set = + (grpc_pollset_set *)gpr_zalloc(sizeof(*pollset_set)); gpr_mu_init(&pollset_set->mu); return pollset_set; } @@ -1174,9 +1188,9 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, if (pollset_set->pollset_count == pollset_set->pollset_capacity) { pollset_set->pollset_capacity = GPR_MAX(8, 2 * pollset_set->pollset_capacity); - pollset_set->pollsets = - gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity * - sizeof(*pollset_set->pollsets)); + pollset_set->pollsets = (grpc_pollset **)gpr_realloc( + pollset_set->pollsets, + pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)); } pollset_set->pollsets[pollset_set->pollset_count++] = pollset; for (i = 0, j = 0; i < pollset_set->fd_count; i++) { @@ -1225,9 +1239,9 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&bag->mu); if (bag->pollset_set_count == bag->pollset_set_capacity) { bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = - gpr_realloc(bag->pollset_sets, - bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); + bag->pollset_sets = (grpc_pollset_set **)gpr_realloc( + bag->pollset_sets, + bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); } bag->pollset_sets[bag->pollset_set_count++] = item; for (i = 0, j = 0; i < bag->fd_count; i++) { @@ -1264,7 +1278,7 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&pollset_set->mu); if (pollset_set->fd_count == pollset_set->fd_capacity) { pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity); - pollset_set->fds = gpr_realloc( + pollset_set->fds = (grpc_fd **)gpr_realloc( pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds)); } GRPC_FD_REF(fd, "pollset_set"); @@ -1318,11 +1332,12 @@ static void cache_insert_locked(poll_args *args) { } static void init_result(poll_args *pargs) { - pargs->result = gpr_malloc(sizeof(poll_result)); + pargs->result = (poll_result *)gpr_malloc(sizeof(poll_result)); gpr_ref_init(&pargs->result->refcount, 1); pargs->result->watchers = NULL; pargs->result->watchcount = 0; - pargs->result->fds = gpr_malloc(sizeof(struct pollfd) * pargs->nfds); + pargs->result->fds = + (struct pollfd *)gpr_malloc(sizeof(struct pollfd) * pargs->nfds); memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds); pargs->result->nfds = pargs->nfds; pargs->result->retval = 0; @@ -1361,7 +1376,7 @@ static poll_args *get_poller_locked(struct pollfd *fds, nfds_t count) { return pargs; } - poll_args *pargs = gpr_malloc(sizeof(struct poll_args)); + poll_args *pargs = (poll_args *)gpr_malloc(sizeof(struct poll_args)); gpr_cv_init(&pargs->trigger); pargs->fds = fds; pargs->nfds = count; @@ -1408,7 +1423,8 @@ static void cache_poller_locked(poll_args *args) { poll_args **old_active_pollers = poll_cache.active_pollers; poll_cache.size = poll_cache.size * 2; poll_cache.count = 0; - poll_cache.active_pollers = gpr_malloc(sizeof(void *) * poll_cache.size); + poll_cache.active_pollers = + (poll_args **)gpr_malloc(sizeof(void *) * poll_cache.size); for (unsigned int i = 0; i < poll_cache.size; i++) { poll_cache.active_pollers[i] = NULL; } @@ -1513,12 +1529,12 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { nfds_t nsockfds = 0; poll_result *result = NULL; gpr_mu_lock(&g_cvfds.mu); - pollcv = gpr_malloc(sizeof(cv_node)); + pollcv = (cv_node *)gpr_malloc(sizeof(cv_node)); pollcv->next = NULL; gpr_cv pollcv_cv; gpr_cv_init(&pollcv_cv); pollcv->cv = &pollcv_cv; - cv_node *fd_cvs = gpr_malloc(nfds * sizeof(cv_node)); + cv_node *fd_cvs = (cv_node *)gpr_malloc(nfds * sizeof(cv_node)); for (i = 0; i < nfds; i++) { fds[i].revents = 0; @@ -1550,7 +1566,8 @@ static int cvfd_poll(struct pollfd *fds, nfds_t nfds, int timeout) { res = 0; if (!skip_poll && nsockfds > 0) { - struct pollfd *pollfds = gpr_malloc(sizeof(struct pollfd) * nsockfds); + struct pollfd *pollfds = + (struct pollfd *)gpr_malloc(sizeof(struct pollfd) * nsockfds); idx = 0; for (i = 0; i < nfds; i++) { if (fds[i].fd >= 0) { @@ -1613,7 +1630,8 @@ static void global_cv_fd_table_init() { gpr_cv_init(&g_cvfds.shutdown_cv); gpr_ref_init(&g_cvfds.pollcount, 1); g_cvfds.size = CV_DEFAULT_TABLE_SIZE; - g_cvfds.cvfds = gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); + g_cvfds.cvfds = + (fd_node *)gpr_malloc(sizeof(fd_node) * CV_DEFAULT_TABLE_SIZE); g_cvfds.free_fds = NULL; thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN); for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) { @@ -1630,7 +1648,7 @@ static void global_cv_fd_table_init() { poll_cache.size = 32; poll_cache.count = 0; poll_cache.free_pollers = NULL; - poll_cache.active_pollers = gpr_malloc(sizeof(void *) * 32); + poll_cache.active_pollers = (poll_args **)gpr_malloc(sizeof(void *) * 32); for (unsigned int i = 0; i < poll_cache.size; i++) { poll_cache.active_pollers[i] = NULL; } diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index bb43061ff5..d881e2d4dd 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -76,10 +76,10 @@ static void add(const char *beg, const char *end, char ***ss, size_t *ns) { size_t len; GPR_ASSERT(end >= beg); len = (size_t)(end - beg); - s = gpr_malloc(len + 1); + s = (char *)gpr_malloc(len + 1); memcpy(s, beg, len); s[len] = 0; - *ss = gpr_realloc(*ss, sizeof(char **) * np); + *ss = (char **)gpr_realloc(*ss, sizeof(char **) * np); (*ss)[n] = s; *ns = np; } diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index dd5cb2a64e..892385d7d7 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -40,6 +40,7 @@ typedef struct { grpc_closure_list elems; size_t depth; bool shutdown; + bool queued_long_job; gpr_thd_id id; } thread_state; @@ -50,6 +51,9 @@ static gpr_spinlock g_adding_thread_lock = GPR_SPINLOCK_STATIC_INITIALIZER; GPR_TLS_DECL(g_this_thread_state); +static grpc_tracer_flag executor_trace = + GRPC_TRACER_INITIALIZER(false, "executor"); + static void executor_thread(void *arg); static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { @@ -59,6 +63,14 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { while (c != NULL) { grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: run %p [created by %s:%d]", c, + c->file_created, c->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: run %p", c); +#endif + } #ifndef NDEBUG c->scheduled = false; #endif @@ -66,6 +78,7 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { GRPC_ERROR_UNREF(error); c = next; n++; + grpc_exec_ctx_flush(exec_ctx); } return n; @@ -82,7 +95,8 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { g_max_threads = GPR_MAX(1, 2 * gpr_cpu_num_cores()); gpr_atm_no_barrier_store(&g_cur_threads, 1); gpr_tls_init(&g_this_thread_state); - g_thread_state = gpr_zalloc(sizeof(thread_state) * g_max_threads); + g_thread_state = + (thread_state *)gpr_zalloc(sizeof(thread_state) * g_max_threads); for (size_t i = 0; i < g_max_threads; i++) { gpr_mu_init(&g_thread_state[i].mu); gpr_cv_init(&g_thread_state[i].cv); @@ -120,6 +134,7 @@ void grpc_executor_set_threading(grpc_exec_ctx *exec_ctx, bool threading) { } void grpc_executor_init(grpc_exec_ctx *exec_ctx) { + grpc_register_tracer(&executor_trace); gpr_atm_no_barrier_store(&g_cur_threads, 0); grpc_executor_set_threading(exec_ctx, true); } @@ -129,7 +144,7 @@ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) { } static void executor_thread(void *arg) { - thread_state *ts = arg; + thread_state *ts = (thread_state *)arg; gpr_tls_set(&g_this_thread_state, (intptr_t)ts); grpc_exec_ctx exec_ctx = @@ -137,12 +152,21 @@ static void executor_thread(void *arg) { size_t subtract_depth = 0; for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: step (sub_depth=%" PRIdPTR ")", + (int)(ts - g_thread_state), subtract_depth); + } gpr_mu_lock(&ts->mu); ts->depth -= subtract_depth; while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) { + ts->queued_long_job = false; gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } if (ts->shutdown) { + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: shutdown", + (int)(ts - g_thread_state)); + } gpr_mu_unlock(&ts->mu); break; } @@ -150,52 +174,128 @@ static void executor_thread(void *arg) { grpc_closure_list exec = ts->elems; ts->elems = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT; gpr_mu_unlock(&ts->mu); + if (GRPC_TRACER_ON(executor_trace)) { + gpr_log(GPR_DEBUG, "EXECUTOR[%d]: execute", (int)(ts - g_thread_state)); + } subtract_depth = run_closures(&exec_ctx, exec); - grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); } static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - GRPC_STATS_INC_EXECUTOR_SCHEDULED_ITEMS(exec_ctx); - if (cur_thread_count == 0) { - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); - return; - } - thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); - if (ts == NULL) { - ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + grpc_error *error, bool is_short) { + bool retry_push; + if (is_short) { + GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS(exec_ctx); } else { - GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); - } - gpr_mu_lock(&ts->mu); - if (grpc_closure_list_empty(ts->elems)) { - GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx); - gpr_cv_signal(&ts->cv); + GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS(exec_ctx); } - grpc_closure_list_append(&ts->elems, closure, error); - ts->depth++; - bool try_new_thread = ts->depth > MAX_DEPTH && - cur_thread_count < g_max_threads && !ts->shutdown; - gpr_mu_unlock(&ts->mu); - if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { - cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); - if (cur_thread_count < g_max_threads) { - gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); - - gpr_thd_options opt = gpr_thd_options_default(); - gpr_thd_options_set_joinable(&opt); - gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, - &g_thread_state[cur_thread_count], &opt); + do { + retry_push = false; + size_t cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count == 0) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p (created %s:%d) inline", + closure, closure->file_created, closure->line_created); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: schedule %p inline", closure); +#endif + } + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + return; } - gpr_spinlock_unlock(&g_adding_thread_lock); - } + thread_state *ts = (thread_state *)gpr_tls_get(&g_this_thread_state); + if (ts == NULL) { + ts = &g_thread_state[GPR_HASH_POINTER(exec_ctx, cur_thread_count)]; + } else { + GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF(exec_ctx); + } + thread_state *orig_ts = ts; + + bool try_new_thread; + for (;;) { + if (GRPC_TRACER_ON(executor_trace)) { +#ifndef NDEBUG + gpr_log( + GPR_DEBUG, + "EXECUTOR: try to schedule %p (%s) (created %s:%d) to thread %d", + closure, is_short ? "short" : "long", closure->file_created, + closure->line_created, (int)(ts - g_thread_state)); +#else + gpr_log(GPR_DEBUG, "EXECUTOR: try to schedule %p (%s) to thread %d", + closure, is_short ? "short" : "long", + (int)(ts - g_thread_state)); +#endif + } + gpr_mu_lock(&ts->mu); + if (ts->queued_long_job) { + // if there's a long job queued, we never queue anything else to this + // queue (since long jobs can take 'infinite' time and we need to + // guarantee no starvation) + // ... spin through queues and try again + gpr_mu_unlock(&ts->mu); + size_t idx = (size_t)(ts - g_thread_state); + ts = &g_thread_state[(idx + 1) % cur_thread_count]; + if (ts == orig_ts) { + retry_push = true; + try_new_thread = true; + break; + } + continue; + } + if (grpc_closure_list_empty(ts->elems)) { + GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED(exec_ctx); + gpr_cv_signal(&ts->cv); + } + grpc_closure_list_append(&ts->elems, closure, error); + ts->depth++; + try_new_thread = ts->depth > MAX_DEPTH && + cur_thread_count < g_max_threads && !ts->shutdown; + if (!is_short) ts->queued_long_job = true; + gpr_mu_unlock(&ts->mu); + break; + } + if (try_new_thread && gpr_spinlock_trylock(&g_adding_thread_lock)) { + cur_thread_count = (size_t)gpr_atm_no_barrier_load(&g_cur_threads); + if (cur_thread_count < g_max_threads) { + gpr_atm_no_barrier_store(&g_cur_threads, cur_thread_count + 1); + + gpr_thd_options opt = gpr_thd_options_default(); + gpr_thd_options_set_joinable(&opt); + gpr_thd_new(&g_thread_state[cur_thread_count].id, executor_thread, + &g_thread_state[cur_thread_count], &opt); + } + gpr_spinlock_unlock(&g_adding_thread_lock); + } + if (retry_push) { + GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES(exec_ctx); + } + } while (retry_push); } -static const grpc_closure_scheduler_vtable executor_vtable = { - executor_push, executor_push, "executor"}; -static grpc_closure_scheduler executor_scheduler = {&executor_vtable}; -grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler; +static void executor_push_short(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, true); +} + +static void executor_push_long(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error) { + executor_push(exec_ctx, closure, error, false); +} + +static const grpc_closure_scheduler_vtable executor_vtable_short = { + executor_push_short, executor_push_short, "executor"}; +static grpc_closure_scheduler executor_scheduler_short = { + &executor_vtable_short}; + +static const grpc_closure_scheduler_vtable executor_vtable_long = { + executor_push_long, executor_push_long, "executor"}; +static grpc_closure_scheduler executor_scheduler_long = {&executor_vtable_long}; + +grpc_closure_scheduler *grpc_executor_scheduler( + grpc_executor_job_length length) { + return length == GRPC_EXECUTOR_SHORT ? &executor_scheduler_short + : &executor_scheduler_long; +} diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h index c3382a0a12..0412c02790 100644 --- a/src/core/lib/iomgr/executor.h +++ b/src/core/lib/iomgr/executor.h @@ -21,6 +21,11 @@ #include "src/core/lib/iomgr/closure.h" +typedef enum { + GRPC_EXECUTOR_SHORT, + GRPC_EXECUTOR_LONG +} grpc_executor_job_length; + /** Initialize the global executor. * * This mechanism is meant to outsource work (grpc_closure instances) to a @@ -28,7 +33,7 @@ * non-blocking solution available. */ void grpc_executor_init(grpc_exec_ctx *exec_ctx); -extern grpc_closure_scheduler *grpc_executor_scheduler; +grpc_closure_scheduler *grpc_executor_scheduler(grpc_executor_job_length); /** Shutdown the executor, running all pending work as part of the call */ void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx); diff --git a/src/core/lib/iomgr/load_file.c b/src/core/lib/iomgr/load_file.c index ba77a52afc..0b4d41ea4b 100644 --- a/src/core/lib/iomgr/load_file.c +++ b/src/core/lib/iomgr/load_file.c @@ -47,7 +47,8 @@ grpc_error *grpc_load_file(const char *filename, int add_null_terminator, /* Converting to size_t on the assumption that it will not fail */ contents_size = (size_t)ftell(file); fseek(file, 0, SEEK_SET); - contents = gpr_malloc(contents_size + (add_null_terminator ? 1 : 0)); + contents = (unsigned char *)gpr_malloc(contents_size + + (add_null_terminator ? 1 : 0)); bytes_read = fread(contents, 1, contents_size, file); if (bytes_read < contents_size) { error = GRPC_OS_ERROR(errno, "fread"); diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 35dedc23de..082e3b7947 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -112,13 +112,14 @@ static grpc_error *blocking_resolve_address_impl( } /* Success path: set addrs non-NULL, fill it in */ - *addresses = gpr_malloc(sizeof(grpc_resolved_addresses)); + *addresses = + (grpc_resolved_addresses *)gpr_malloc(sizeof(grpc_resolved_addresses)); (*addresses)->naddrs = 0; for (resp = result; resp != NULL; resp = resp->ai_next) { (*addresses)->naddrs++; } - (*addresses)->addrs = - gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs); + (*addresses)->addrs = (grpc_resolved_address *)gpr_malloc( + sizeof(grpc_resolved_address) * (*addresses)->naddrs); i = 0; for (resp = result; resp != NULL; resp = resp->ai_next) { memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); @@ -153,7 +154,7 @@ typedef struct { * grpc_blocking_resolve_address */ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, grpc_error *error) { - request *r = rp; + request *r = (request *)rp; GRPC_CLOSURE_SCHED( exec_ctx, r->on_done, grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out)); @@ -174,9 +175,9 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_pollset_set *interested_parties, grpc_closure *on_done, grpc_resolved_addresses **addrs) { - request *r = gpr_malloc(sizeof(request)); + request *r = (request *)gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler); + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 45cfd7248d..0cb0029f4e 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -159,7 +159,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_resolved_addresses **addresses) { request *r = gpr_malloc(sizeof(request)); GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, - grpc_executor_scheduler); + grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index a31d9eef93..6c58986b53 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -241,7 +241,7 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota, bool destructive); static void rq_step(grpc_exec_ctx *exec_ctx, void *rq, grpc_error *error) { - grpc_resource_quota *resource_quota = rq; + grpc_resource_quota *resource_quota = (grpc_resource_quota *)rq; resource_quota->step_scheduled = false; do { if (rq_alloc(exec_ctx, resource_quota)) goto done; @@ -380,12 +380,12 @@ typedef struct { } ru_slice_refcount; static void ru_slice_ref(void *p) { - ru_slice_refcount *rc = p; + ru_slice_refcount *rc = (ru_slice_refcount *)p; gpr_ref(&rc->refs); } static void ru_slice_unref(grpc_exec_ctx *exec_ctx, void *p) { - ru_slice_refcount *rc = p; + ru_slice_refcount *rc = (ru_slice_refcount *)p; if (gpr_unref(&rc->refs)) { grpc_resource_user_free(exec_ctx, rc->resource_user, rc->size); gpr_free(rc); @@ -398,7 +398,8 @@ static const grpc_slice_refcount_vtable ru_slice_vtable = { static grpc_slice ru_slice_create(grpc_resource_user *resource_user, size_t size) { - ru_slice_refcount *rc = gpr_malloc(sizeof(ru_slice_refcount) + size); + ru_slice_refcount *rc = + (ru_slice_refcount *)gpr_malloc(sizeof(ru_slice_refcount) + size); rc->base.vtable = &ru_slice_vtable; rc->base.sub_refcount = &rc->base; gpr_ref_init(&rc->refs, 1); @@ -417,7 +418,7 @@ static grpc_slice ru_slice_create(grpc_resource_user *resource_user, */ static void ru_allocate(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { - grpc_resource_user *resource_user = ru; + grpc_resource_user *resource_user = (grpc_resource_user *)ru; if (rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION)) { rq_step_sched(exec_ctx, resource_user->resource_quota); @@ -427,7 +428,7 @@ static void ru_allocate(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { - grpc_resource_user *resource_user = ru; + grpc_resource_user *resource_user = (grpc_resource_user *)ru; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && rulist_empty(resource_user->resource_quota, @@ -454,7 +455,7 @@ static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx, static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { - grpc_resource_user *resource_user = ru; + grpc_resource_user *resource_user = (grpc_resource_user *)ru; if (!ru_post_reclaimer(exec_ctx, resource_user, false)) return; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && @@ -469,7 +470,7 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { - grpc_resource_user *resource_user = ru; + grpc_resource_user *resource_user = (grpc_resource_user *)ru; if (!ru_post_reclaimer(exec_ctx, resource_user, true)) return; if (!rulist_empty(resource_user->resource_quota, GRPC_RULIST_AWAITING_ALLOCATION) && @@ -485,7 +486,7 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, } static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { - grpc_resource_user *resource_user = ru; + grpc_resource_user *resource_user = (grpc_resource_user *)ru; GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[1], @@ -497,7 +498,7 @@ static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { } static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { - grpc_resource_user *resource_user = ru; + grpc_resource_user *resource_user = (grpc_resource_user *)ru; GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { rulist_remove(resource_user, (grpc_rulist)i); @@ -518,7 +519,8 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_resource_user_slice_allocator *slice_allocator = arg; + grpc_resource_user_slice_allocator *slice_allocator = + (grpc_resource_user_slice_allocator *)arg; if (error == GRPC_ERROR_NONE) { for (size_t i = 0; i < slice_allocator->count; i++) { grpc_slice_buffer_add_indexed( @@ -541,7 +543,7 @@ typedef struct { } rq_resize_args; static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { - rq_resize_args *a = args; + rq_resize_args *a = (rq_resize_args *)args; int64_t delta = a->size - a->resource_quota->size; a->resource_quota->size += delta; a->resource_quota->free_pool += delta; @@ -553,7 +555,7 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq, grpc_error *error) { - grpc_resource_quota *resource_quota = rq; + grpc_resource_quota *resource_quota = (grpc_resource_quota *)rq; resource_quota->reclaiming = false; rq_step_sched(exec_ctx, resource_quota); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); @@ -565,7 +567,8 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq, /* Public API */ grpc_resource_quota *grpc_resource_quota_create(const char *name) { - grpc_resource_quota *resource_quota = gpr_malloc(sizeof(*resource_quota)); + grpc_resource_quota *resource_quota = + (grpc_resource_quota *)gpr_malloc(sizeof(*resource_quota)); gpr_ref_init(&resource_quota->refs, 1); resource_quota->combiner = grpc_combiner_create(); resource_quota->free_pool = INT64_MAX; @@ -629,7 +632,7 @@ double grpc_resource_quota_get_memory_pressure( void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, size_t size) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - rq_resize_args *a = gpr_malloc(sizeof(*a)); + rq_resize_args *a = (rq_resize_args *)gpr_malloc(sizeof(*a)); a->resource_quota = grpc_resource_quota_ref_internal(resource_quota); a->size = (int64_t)size; gpr_atm_no_barrier_store(&resource_quota->last_size, @@ -684,7 +687,8 @@ const grpc_arg_pointer_vtable *grpc_resource_quota_arg_vtable(void) { grpc_resource_user *grpc_resource_user_create( grpc_resource_quota *resource_quota, const char *name) { - grpc_resource_user *resource_user = gpr_malloc(sizeof(*resource_user)); + grpc_resource_user *resource_user = + (grpc_resource_user *)gpr_malloc(sizeof(*resource_user)); resource_user->resource_quota = grpc_resource_quota_ref_internal(resource_quota); GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate, diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index a25fba4527..39dbb506e2 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -80,7 +80,8 @@ static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd, for (size_t i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_SOCKET_MUTATOR)) { GPR_ASSERT(channel_args->args[i].type == GRPC_ARG_POINTER); - grpc_socket_mutator *mutator = channel_args->args[i].value.pointer.p; + grpc_socket_mutator *mutator = + (grpc_socket_mutator *)channel_args->args[i].value.pointer.p; err = grpc_set_socket_with_mutator(fd, mutator); if (err != GRPC_ERROR_NONE) goto error; } @@ -98,7 +99,7 @@ done: static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { int done; - async_connect *ac = acp; + async_connect *ac = (async_connect *)acp; if (GRPC_TRACER_ON(grpc_tcp_trace)) { const char *str = grpc_error_string(error); gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", ac->addr_str, @@ -126,7 +127,7 @@ grpc_endpoint *grpc_tcp_client_create_from_fd( } static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { - async_connect *ac = acp; + async_connect *ac = (async_connect *)acp; int so_error = 0; socklen_t so_error_size; int err; @@ -304,7 +305,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_pollset_set_add_fd(exec_ctx, interested_parties, fdobj); - ac = gpr_malloc(sizeof(async_connect)); + ac = (async_connect *)gpr_malloc(sizeof(async_connect)); ac->closure = closure; ac->ep = ep; ac->fd = fdobj; diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 3372e14eef..7e271294fd 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -43,6 +43,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" @@ -90,8 +91,8 @@ typedef struct { grpc_closure *release_fd_cb; int *release_fd; - grpc_closure read_closure; - grpc_closure write_closure; + grpc_closure read_done_closure; + grpc_closure write_done_closure; char *peer_string; @@ -99,6 +100,148 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +typedef struct backup_poller { + gpr_mu *pollset_mu; + grpc_closure run_poller; +} backup_poller; + +#define BACKUP_POLLER_POLLSET(b) ((grpc_pollset *)((b) + 1)) + +static gpr_atm g_uncovered_notifications_pending; +static gpr_atm g_backup_poller; /* backup_poller* */ + +static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); +static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, + grpc_error *error); +static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx, + void *arg /* grpc_tcp */, + grpc_error *error); + +static void done_poller(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error_ignored) { + backup_poller *p = (backup_poller *)bp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p destroy", p); + } + grpc_pollset_destroy(exec_ctx, BACKUP_POLLER_POLLSET(p)); + gpr_free(p); +} + +static void run_poller(grpc_exec_ctx *exec_ctx, void *bp, + grpc_error *error_ignored) { + backup_poller *p = (backup_poller *)bp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p run", p); + } + gpr_mu_lock(p->pollset_mu); + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec deadline = + gpr_time_add(now, gpr_time_from_seconds(10, GPR_TIMESPAN)); + GRPC_STATS_INC_TCP_BACKUP_POLLER_POLLS(exec_ctx); + GRPC_LOG_IF_ERROR("backup_poller:pollset_work", + grpc_pollset_work(exec_ctx, BACKUP_POLLER_POLLSET(p), NULL, + now, deadline)); + gpr_mu_unlock(p->pollset_mu); + /* last "uncovered" notification is the ref that keeps us polling, if we get + * there try a cas to release it */ + if (gpr_atm_no_barrier_load(&g_uncovered_notifications_pending) == 1 && + gpr_atm_full_cas(&g_uncovered_notifications_pending, 1, 0)) { + gpr_mu_lock(p->pollset_mu); + bool cas_ok = gpr_atm_full_cas(&g_backup_poller, (gpr_atm)p, 0); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p done cas_ok=%d", p, cas_ok); + } + gpr_mu_unlock(p->pollset_mu); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p shutdown", p); + } + grpc_pollset_shutdown(exec_ctx, BACKUP_POLLER_POLLSET(p), + GRPC_CLOSURE_INIT(&p->run_poller, done_poller, p, + grpc_schedule_on_exec_ctx)); + } else { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p reschedule", p); + } + GRPC_CLOSURE_SCHED(exec_ctx, &p->run_poller, GRPC_ERROR_NONE); + } +} + +static void drop_uncovered(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + backup_poller *p = (backup_poller *)gpr_atm_acq_load(&g_backup_poller); + gpr_atm old_count = + gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, -1); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p uncover cnt %d->%d", p, (int)old_count, + (int)old_count - 1); + } + GPR_ASSERT(old_count != 1); +} + +static void cover_self(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + backup_poller *p; + gpr_atm old_count = + gpr_atm_no_barrier_fetch_add(&g_uncovered_notifications_pending, 2); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER: cover cnt %d->%d", (int)old_count, + 2 + (int)old_count); + } + if (old_count == 0) { + GRPC_STATS_INC_TCP_BACKUP_POLLERS_CREATED(exec_ctx); + p = (backup_poller *)gpr_malloc(sizeof(*p) + grpc_pollset_size()); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p create", p); + } + grpc_pollset_init(BACKUP_POLLER_POLLSET(p), &p->pollset_mu); + gpr_atm_rel_store(&g_backup_poller, (gpr_atm)p); + GRPC_CLOSURE_SCHED( + exec_ctx, + GRPC_CLOSURE_INIT(&p->run_poller, run_poller, p, + grpc_executor_scheduler(GRPC_EXECUTOR_LONG)), + GRPC_ERROR_NONE); + } else { + while ((p = (backup_poller *)gpr_atm_acq_load(&g_backup_poller)) == NULL) { + // spin waiting for backup poller + } + } + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "BACKUP_POLLER:%p add %p", p, tcp); + } + grpc_pollset_add_fd(exec_ctx, BACKUP_POLLER_POLLSET(p), tcp->em_fd); + if (old_count != 0) { + drop_uncovered(exec_ctx, tcp); + } +} + +static void notify_on_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p notify_on_read", tcp); + } + GRPC_CLOSURE_INIT(&tcp->read_done_closure, tcp_handle_read, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_done_closure); +} + +static void notify_on_write(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p notify_on_write", tcp); + } + cover_self(exec_ctx, tcp); + GRPC_CLOSURE_INIT(&tcp->write_done_closure, + tcp_drop_uncovered_then_handle_write, tcp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_done_closure); +} + +static void tcp_drop_uncovered_then_handle_write(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p got_write: %s", arg, grpc_error_string(error)); + } + drop_uncovered(exec_ctx, (grpc_tcp *)arg); + tcp_handle_write(exec_ctx, arg, error); +} + static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { tcp->bytes_read_this_round += (double)bytes; } @@ -214,6 +357,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, grpc_closure *cb = tcp->read_cb; if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p call_cb %p %p:%p", tcp, cb, cb->cb, cb->cb_arg); size_t i; const char *str = grpc_error_string(error); gpr_log(GPR_DEBUG, "read: error=%s", str); @@ -271,7 +415,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (errno == EAGAIN) { finish_estimate(tcp); /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + notify_on_read(exec_ctx, tcp); } else { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); @@ -307,7 +451,11 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { - grpc_tcp *tcp = tcpp; + grpc_tcp *tcp = (grpc_tcp *)tcpp; + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p read_allocation_done: %s", tcp, + grpc_error_string(error)); + } if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); grpc_slice_buffer_reset_and_unref_internal(exec_ctx, @@ -323,9 +471,15 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { size_t target_read_size = get_target_read_size(tcp); if (tcp->incoming_buffer->length < target_read_size && tcp->incoming_buffer->count < MAX_READ_IOVEC) { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p alloc_slices", tcp); + } grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, target_read_size, 1, tcp->incoming_buffer); } else { + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p do_read", tcp); + } tcp_do_read(exec_ctx, tcp); } } @@ -334,6 +488,9 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); + if (GRPC_TRACER_ON(grpc_tcp_trace)) { + gpr_log(GPR_DEBUG, "TCP:%p got_read: %s", tcp, grpc_error_string(error)); + } if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer); @@ -357,9 +514,9 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = false; - grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); + notify_on_read(exec_ctx, tcp); } else { - GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_done_closure, GRPC_ERROR_NONE); } } @@ -472,7 +629,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + notify_on_write(exec_ctx, tcp); } else { cb = tcp->write_cb; tcp->write_cb = NULL; @@ -525,7 +682,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "write: delayed"); } - grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); + notify_on_write(exec_ctx, tcp); } else { if (GRPC_TRACER_ON(grpc_tcp_trace)) { const char *str = grpc_error_string(error); @@ -602,7 +759,7 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { grpc_resource_quota_unref_internal(exec_ctx, resource_quota); resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); + (grpc_resource_quota *)channel_args->args[i].value.pointer.p); } } } @@ -631,10 +788,6 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; - GRPC_CLOSURE_INIT(&tcp->read_closure, tcp_handle_read, tcp, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&tcp->write_closure, tcp_handle_write, tcp, - grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&tcp->last_read_buffer); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); grpc_resource_user_slice_allocator_init( diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 0fc5c0fd86..c3ec3e447a 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -74,7 +74,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server **server) { gpr_once_init(&check_init, init); - grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server)); + grpc_tcp_server *s = (grpc_tcp_server *)gpr_zalloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; s->expand_wildcard_addrs = false; for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { @@ -138,7 +138,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, grpc_error *error) { - grpc_tcp_server *s = server; + grpc_tcp_server *s = (grpc_tcp_server *)server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { @@ -197,7 +197,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { - grpc_tcp_listener *sp = arg; + grpc_tcp_listener *sp = (grpc_tcp_listener *)arg; if (err != GRPC_ERROR_NONE) { goto error; @@ -251,7 +251,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); // Create acceptor. - grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor)); + grpc_tcp_server_acceptor *acceptor = + (grpc_tcp_server_acceptor *)gpr_malloc(sizeof(*acceptor)); acceptor->from_server = sp->server; acceptor->port_index = sp->port_index; acceptor->fd_index = sp->fd_index; @@ -365,7 +366,7 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { listener->server->nports++; grpc_sockaddr_to_string(&addr_str, &listener->addr, 1); gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp = (grpc_tcp_listener *)gpr_malloc(sizeof(grpc_tcp_listener)); sp->next = listener->next; listener->next = sp; /* sp (the new listener) is a sibling of 'listener' (the original diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.c b/src/core/lib/iomgr/tcp_server_utils_posix_common.c index ad535bc43e..a828bee074 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.c +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.c @@ -93,7 +93,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, gpr_mu_lock(&s->mu); s->nports++; GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp = (grpc_tcp_listener *)gpr_malloc(sizeof(grpc_tcp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; diff --git a/src/core/lib/iomgr/timer_heap.c b/src/core/lib/iomgr/timer_heap.c index a70e3942b2..2648d5da5d 100644 --- a/src/core/lib/iomgr/timer_heap.c +++ b/src/core/lib/iomgr/timer_heap.c @@ -74,8 +74,8 @@ static void maybe_shrink(grpc_timer_heap *heap) { if (heap->timer_count >= 8 && heap->timer_count <= heap->timer_capacity / SHRINK_FULLNESS_FACTOR / 2) { heap->timer_capacity = heap->timer_count * SHRINK_FULLNESS_FACTOR; - heap->timers = - gpr_realloc(heap->timers, heap->timer_capacity * sizeof(grpc_timer *)); + heap->timers = (grpc_timer **)gpr_realloc( + heap->timers, heap->timer_capacity * sizeof(grpc_timer *)); } } @@ -99,8 +99,8 @@ int grpc_timer_heap_add(grpc_timer_heap *heap, grpc_timer *timer) { if (heap->timer_count == heap->timer_capacity) { heap->timer_capacity = GPR_MAX(heap->timer_capacity + 1, heap->timer_capacity * 3 / 2); - heap->timers = - gpr_realloc(heap->timers, heap->timer_capacity * sizeof(grpc_timer *)); + heap->timers = (grpc_timer **)gpr_realloc( + heap->timers, heap->timer_capacity * sizeof(grpc_timer *)); } timer->heap_index = heap->timer_count; adjust_upwards(heap->timers, heap->timer_count, timer); diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 631f7935d9..ae2c0bf0ae 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -83,7 +83,7 @@ static void start_timer_thread_and_unlock(void) { } gpr_thd_options opt = gpr_thd_options_default(); gpr_thd_options_set_joinable(&opt); - completed_thread *ct = gpr_malloc(sizeof(*ct)); + completed_thread *ct = (completed_thread *)gpr_malloc(sizeof(*ct)); // The call to gpr_thd_new() has to be under the same lock used by // gc_completed_threads(), particularly due to ct->t, which is written here // (internally by gpr_thd_new) and read there. Otherwise it's possible for ct diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 88fa34cb7a..9a02c1d1bb 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -125,7 +125,7 @@ static grpc_socket_factory *get_socket_factory(const grpc_channel_args *args) { } grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { - grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); + grpc_udp_server *s = (grpc_udp_server *)gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); s->socket_factory = get_socket_factory(args); if (s->socket_factory) { @@ -176,7 +176,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, grpc_error *error) { - grpc_udp_server *s = server; + grpc_udp_server *s = (grpc_udp_server *)server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { @@ -237,7 +237,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); - struct shutdown_fd_args *args = gpr_malloc(sizeof(*args)); + struct shutdown_fd_args *args = + (struct shutdown_fd_args *)gpr_malloc(sizeof(*args)); args->fd = sp->emfd; args->server_mu = &s->mu; GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, @@ -331,7 +332,7 @@ error: /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_udp_listener *sp = arg; + grpc_udp_listener *sp = (grpc_udp_listener *)arg; gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { @@ -354,7 +355,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_udp_listener *sp = arg; + grpc_udp_listener *sp = (grpc_udp_listener *)arg; gpr_mu_lock(&(sp->server->mu)); if (error != GRPC_ERROR_NONE) { @@ -393,7 +394,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, gpr_free(addr_str); gpr_mu_lock(&s->mu); s->nports++; - sp = gpr_malloc(sizeof(grpc_udp_listener)); + sp = (grpc_udp_listener *)gpr_malloc(sizeof(grpc_udp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -444,7 +445,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, (socklen_t *)&sockname_temp.len)) { port = grpc_sockaddr_get_port(&sockname_temp); if (port > 0) { - allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); + allocated_addr = (grpc_resolved_address *)gpr_malloc( + sizeof(grpc_resolved_address)); memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); grpc_sockaddr_set_port(allocated_addr, port); addr = allocated_addr; diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c index 0c8627c8c6..35f898f13a 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.c +++ b/src/core/lib/iomgr/unix_sockets_posix.c @@ -49,9 +49,11 @@ grpc_error *grpc_resolve_unix_domain_address(const char *name, gpr_free(err_msg); return err; } - *addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); + *addrs = + (grpc_resolved_addresses *)gpr_malloc(sizeof(grpc_resolved_addresses)); (*addrs)->naddrs = 1; - (*addrs)->addrs = gpr_malloc(sizeof(grpc_resolved_address)); + (*addrs)->addrs = + (grpc_resolved_address *)gpr_malloc(sizeof(grpc_resolved_address)); un = (struct sockaddr_un *)(*addrs)->addrs->addr; un->sun_family = AF_UNIX; strcpy(un->sun_path, name); diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c index 075a0b6426..5e0b1d1704 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.c +++ b/src/core/lib/iomgr/wakeup_fd_cv.c @@ -42,7 +42,8 @@ static grpc_error* cv_fd_init(grpc_wakeup_fd* fd_info) { gpr_mu_lock(&g_cvfds.mu); if (!g_cvfds.free_fds) { newsize = GPR_MIN(g_cvfds.size * 2, g_cvfds.size + MAX_TABLE_RESIZE); - g_cvfds.cvfds = gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize); + g_cvfds.cvfds = + (fd_node*)gpr_realloc(g_cvfds.cvfds, sizeof(fd_node) * newsize); for (i = g_cvfds.size; i < newsize; i++) { g_cvfds.cvfds[i].is_set = 0; g_cvfds.cvfds[i].cvs = NULL; |