diff options
Diffstat (limited to 'src/core/lib')
35 files changed, 666 insertions, 448 deletions
diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index fc338342e4..9eab1360a4 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -47,7 +47,7 @@ typedef struct { grpc_channel_security_connector base; - tsi_ssl_handshaker_factory *handshaker_factory; + tsi_ssl_client_handshaker_factory *handshaker_factory; char *secure_peer_name; } grpc_httpcli_ssl_channel_security_connector; @@ -56,7 +56,7 @@ static void httpcli_ssl_destroy(grpc_exec_ctx *exec_ctx, grpc_httpcli_ssl_channel_security_connector *c = (grpc_httpcli_ssl_channel_security_connector *)sc; if (c->handshaker_factory != NULL) { - tsi_ssl_handshaker_factory_destroy(c->handshaker_factory); + tsi_ssl_client_handshaker_factory_destroy(c->handshaker_factory); } if (c->secure_peer_name != NULL) gpr_free(c->secure_peer_name); gpr_free(sc); @@ -69,7 +69,7 @@ static void httpcli_ssl_add_handshakers(grpc_exec_ctx *exec_ctx, (grpc_httpcli_ssl_channel_security_connector *)sc; tsi_handshaker *handshaker = NULL; if (c->handshaker_factory != NULL) { - tsi_result result = tsi_ssl_handshaker_factory_create_handshaker( + tsi_result result = tsi_ssl_client_handshaker_factory_create_handshaker( c->handshaker_factory, c->secure_peer_name, &handshaker); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.", diff --git a/src/core/lib/iomgr/endpoint_pair.h b/src/core/lib/iomgr/endpoint_pair.h index f9de0c715e..6407a6ad3f 100644 --- a/src/core/lib/iomgr/endpoint_pair.h +++ b/src/core/lib/iomgr/endpoint_pair.h @@ -41,8 +41,7 @@ typedef struct { grpc_endpoint *server; } grpc_endpoint_pair; -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size); +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + grpc_channel_args *args); #endif /* GRPC_CORE_LIB_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c index b9ff969e81..5542a372d8 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.c +++ b/src/core/lib/iomgr/endpoint_pair_posix.c @@ -62,22 +62,25 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]) == GRPC_ERROR_NONE); } -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + grpc_channel_args *args) { int sv[2]; grpc_endpoint_pair p; char *final_name; create_sockets(sv); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), resource_quota, - read_slice_size, "socketpair-server"); + p.client = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], final_name), args, + "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), resource_quota, - read_slice_size, "socketpair-client"); + p.server = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[0], final_name), args, + "socketpair-client"); gpr_free(final_name); + + grpc_exec_ctx_finish(&exec_ctx); return p; } diff --git a/src/core/lib/iomgr/endpoint_pair_uv.c b/src/core/lib/iomgr/endpoint_pair_uv.c index ff24894c6d..9718eb0523 100644 --- a/src/core/lib/iomgr/endpoint_pair_uv.c +++ b/src/core/lib/iomgr/endpoint_pair_uv.c @@ -41,9 +41,8 @@ #include "src/core/lib/iomgr/endpoint_pair.h" -grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size) { +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + grpc_channel_args *args) { grpc_endpoint_pair endpoint_pair; // TODO(mlumish): implement this properly under libuv GPR_ASSERT(false && diff --git a/src/core/lib/iomgr/endpoint_pair_windows.c b/src/core/lib/iomgr/endpoint_pair_windows.c index 93f71b745c..25d6264dfb 100644 --- a/src/core/lib/iomgr/endpoint_pair_windows.c +++ b/src/core/lib/iomgr/endpoint_pair_windows.c @@ -83,15 +83,18 @@ static void create_sockets(SOCKET sv[2]) { } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair( - const char *name, grpc_resource_quota *resource_quota, - size_t read_slice_size) { + const char *name, grpc_channel_args *channel_args) { SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); - p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), - resource_quota, "endpoint:server"); - p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), - resource_quota, "endpoint:client"); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + p.client = grpc_tcp_create(&exec_ctx, + grpc_winsocket_create(sv[1], "endpoint:client"), + channel_args, "endpoint:server"); + p.server = grpc_tcp_create(&exec_ctx, + grpc_winsocket_create(sv[0], "endpoint:server"), + channel_args, "endpoint:client"); + grpc_exec_ctx_finish(&exec_ctx); return p; } diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 1dbb64e8f3..fbbca6b493 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -212,7 +212,11 @@ static uint8_t get_placement(grpc_error **err, size_t size) { GPR_ASSERT(*err); uint8_t slots = (uint8_t)(size / sizeof(intptr_t)); if ((*err)->arena_size + slots > (*err)->arena_capacity) { - (*err)->arena_capacity = (uint8_t)(3 * (*err)->arena_capacity / 2); + (*err)->arena_capacity = + (uint8_t)GPR_MIN(UINT8_MAX - 1, (3 * (*err)->arena_capacity / 2)); + if ((*err)->arena_size + slots > (*err)->arena_capacity) { + return UINT8_MAX; + } *err = gpr_realloc( *err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t)); } @@ -223,10 +227,14 @@ static uint8_t get_placement(grpc_error **err, size_t size) { static void internal_set_int(grpc_error **err, grpc_error_ints which, intptr_t value) { - // GPR_ASSERT((*err)->ints[which] == UINT8_MAX); // TODO, enforce this uint8_t slot = (*err)->ints[which]; if (slot == UINT8_MAX) { slot = get_placement(err, sizeof(value)); + if (slot == UINT8_MAX) { + gpr_log(GPR_ERROR, "Error %p is full, dropping int {\"%s\":%" PRIiPTR "}", + *err, error_int_name(which), value); + return; + } } (*err)->ints[which] = slot; (*err)->arena[slot] = value; @@ -234,10 +242,16 @@ static void internal_set_int(grpc_error **err, grpc_error_ints which, static void internal_set_str(grpc_error **err, grpc_error_strs which, grpc_slice value) { - // GPR_ASSERT((*err)->strs[which] == UINT8_MAX); // TODO, enforce this uint8_t slot = (*err)->strs[which]; if (slot == UINT8_MAX) { slot = get_placement(err, sizeof(value)); + if (slot == UINT8_MAX) { + const char *str = grpc_slice_to_c_string(value); + gpr_log(GPR_ERROR, "Error %p is full, dropping string {\"%s\":\"%s\"}", + *err, error_str_name(which), str); + gpr_free((void *)str); + return; + } } else { unref_slice(*(grpc_slice *)((*err)->arena + slot)); } @@ -245,12 +259,19 @@ static void internal_set_str(grpc_error **err, grpc_error_strs which, memcpy((*err)->arena + slot, &value, sizeof(value)); } +static char *fmt_time(gpr_timespec tm); static void internal_set_time(grpc_error **err, grpc_error_times which, gpr_timespec value) { - // GPR_ASSERT((*err)->times[which] == UINT8_MAX); // TODO, enforce this uint8_t slot = (*err)->times[which]; if (slot == UINT8_MAX) { slot = get_placement(err, sizeof(value)); + if (slot == UINT8_MAX) { + const char *time_str = fmt_time(value); + gpr_log(GPR_ERROR, "Error %p is full, dropping \"%s\":\"%s\"}", *err, + error_time_name(which), time_str); + gpr_free((void *)time_str); + return; + } } (*err)->times[which] = slot; memcpy((*err)->arena + slot, &value, sizeof(value)); @@ -259,6 +280,12 @@ static void internal_set_time(grpc_error **err, grpc_error_times which, static void internal_add_error(grpc_error **err, grpc_error *new) { grpc_linked_error new_last = {new, UINT8_MAX}; uint8_t slot = get_placement(err, sizeof(grpc_linked_error)); + if (slot == UINT8_MAX) { + gpr_log(GPR_ERROR, "Error %p is full, dropping error %p = %s", *err, new, + grpc_error_string(new)); + GRPC_ERROR_UNREF(new); + return; + } if ((*err)->first_err == UINT8_MAX) { GPR_ASSERT((*err)->last_err == UINT8_MAX); (*err)->last_err = slot; diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 7014b98349..4a0f91391f 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -56,6 +56,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" #include "src/core/lib/iomgr/workqueue.h" @@ -141,52 +142,11 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; - /* Internally stores data of type (grpc_error *). If the FD is shutdown, this - contains reason for shutdown (i.e a pointer to grpc_error) ORed with - FD_SHUTDOWN_BIT. Since address allocations are word-aligned, the lower bit - of (grpc_error *) addresses is guaranteed to be zero. Even if the - (grpc_error *), is of special types like GRPC_ERROR_NONE, GRPC_ERROR_OOM - etc, the lower bit is guaranteed to be zero. - - Once an fd is shutdown, any pending or future read/write closures on the - fd should fail */ - gpr_atm shutdown_error; - /* The fd is either closed or we relinquished control of it. In either cases, this indicates that the 'fd' on this structure is no longer valid */ bool orphaned; - /* Closures to call when the fd is readable or writable respectively. These - fields contain one of the following values: - CLOSURE_READY : The fd has an I/O event of interest but there is no - closure yet to execute - - CLOSURE_NOT_READY : The fd has no I/O event of interest - - closure ptr : The closure to be executed when the fd has an I/O - event of interest - - shutdown_error | FD_SHUTDOWN_BIT : - 'shutdown_error' field ORed with FD_SHUTDOWN_BIT. - This indicates that the fd is shutdown. Since all - memory allocations are word-aligned, the lower two - bits of the shutdown_error pointer are always 0. So - it is safe to OR these with FD_SHUTDOWN_BIT - - Valid state transitions: - - <closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY - | | ^ | ^ | | - | | | | | | | - | +--------------4----------+ 6 +---------2---------------+ | - | | | - | v | - +-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+ - - For 1, 4 : See set_ready() function - For 2, 3 : See notify_on() function - For 5,6,7: See set_shutdown() function */ gpr_atm read_closure; gpr_atm write_closure; @@ -218,11 +178,6 @@ static void fd_unref(grpc_fd *fd); static void fd_global_init(void); static void fd_global_shutdown(void); -#define CLOSURE_NOT_READY ((gpr_atm)0) -#define CLOSURE_READY ((gpr_atm)2) - -#define FD_SHUTDOWN_BIT 1 - /******************************************************************************* * Polling island Declarations */ @@ -949,10 +904,8 @@ static void unref_by(grpc_fd *fd, int n) { fd_freelist = fd; grpc_iomgr_unregister_object(&fd->iomgr_object); - grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error); - /* Clear the least significant bit if it set (in case fd was shutdown) */ - err = (grpc_error *)((intptr_t)err & ~FD_SHUTDOWN_BIT); - GRPC_ERROR_UNREF(err); + grpc_lfev_destroy(&fd->read_closure); + grpc_lfev_destroy(&fd->write_closure); gpr_mu_unlock(&fd_freelist_mu); } else { @@ -1016,10 +969,9 @@ static grpc_fd *fd_create(int fd, const char *name) { gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; - gpr_atm_no_barrier_store(&new_fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE); new_fd->orphaned = false; - gpr_atm_no_barrier_store(&new_fd->read_closure, CLOSURE_NOT_READY); - gpr_atm_no_barrier_store(&new_fd->write_closure, CLOSURE_NOT_READY); + grpc_lfev_init(&new_fd->read_closure); + grpc_lfev_init(&new_fd->write_closure); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = NULL; @@ -1105,153 +1057,6 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_ERROR_UNREF(error); } -static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, - grpc_closure *closure) { - while (true) { - gpr_atm curr = gpr_atm_no_barrier_load(state); - switch (curr) { - case CLOSURE_NOT_READY: { - /* CLOSURE_NOT_READY -> <closure>. - - We're guaranteed by API that there's an acquire barrier before here, - so there's no need to double-dip and this can be a release-only. - - The release itself pairs with the acquire half of a set_ready full - barrier. */ - if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { - return; /* Successful. Return */ - } - - break; /* retry */ - } - - case CLOSURE_READY: { - /* Change the state to CLOSURE_NOT_READY. Schedule the closure if - successful. If not, the state most likely transitioned to shutdown. - We should retry. - - This can be a no-barrier cas since the state is being transitioned to - CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any - closure when transitioning out of CLOSURE_NO_READY state (i.e there - is no other code that needs to 'happen-after' this) */ - if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); - return; /* Successful. Return */ - } - - break; /* retry */ - } - - default: { - /* 'curr' is either a closure or the fd is shutdown(in which case 'curr' - contains a pointer to the shutdown-error). If the fd is shutdown, - schedule the closure with the shutdown error */ - if ((curr & FD_SHUTDOWN_BIT) > 0) { - grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT); - grpc_closure_sched(exec_ctx, closure, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "FD Shutdown", &shutdown_err, 1)); - return; - } - - /* There is already a closure!. This indicates a bug in the code */ - gpr_log(GPR_ERROR, - "notify_on called with a previous callback still pending"); - abort(); - } - } - } - - GPR_UNREACHABLE_CODE(return ); -} - -static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state, - grpc_error *shutdown_err) { - gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT; - - while (true) { - gpr_atm curr = gpr_atm_no_barrier_load(state); - switch (curr) { - case CLOSURE_READY: - case CLOSURE_NOT_READY: - /* Need a full barrier here so that the initial load in notify_on - doesn't need a barrier */ - if (gpr_atm_full_cas(state, curr, new_state)) { - return; /* early out */ - } - break; /* retry */ - - default: { - /* 'curr' is either a closure or the fd is already shutdown */ - - /* If fd is already shutdown, we are done */ - if ((curr & FD_SHUTDOWN_BIT) > 0) { - return; - } - - /* Fd is not shutdown. Schedule the closure and move the state to - shutdown state. - Needs an acquire to pair with setting the closure (and get a - happens-after on that edge), and a release to pair with anything - loading the shutdown state. */ - if (gpr_atm_full_cas(state, curr, new_state)) { - grpc_closure_sched(exec_ctx, (grpc_closure *)curr, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "FD Shutdown", &shutdown_err, 1)); - return; - } - - /* 'curr' was a closure but now changed to a different state. We will - have to retry */ - break; - } - } - } - - GPR_UNREACHABLE_CODE(return ); -} - -static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) { - while (true) { - gpr_atm curr = gpr_atm_no_barrier_load(state); - - switch (curr) { - case CLOSURE_READY: { - /* Already ready. We are done here */ - return; - } - - case CLOSURE_NOT_READY: { - /* No barrier required as we're transitioning to a state that does not - involve a closure */ - if (gpr_atm_no_barrier_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { - return; /* early out */ - } - break; /* retry */ - } - - default: { - /* 'curr' is either a closure or the fd is shutdown */ - if ((curr & FD_SHUTDOWN_BIT) > 0) { - /* The fd is shutdown. Do nothing */ - return; - } - /* Full cas: acquire pairs with this cas' release in the event of a - spurious set_ready; release pairs with this or the acquire in - notify_on (or set_shutdown) */ - else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) { - grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); - return; - } - /* else the state changed again (only possible by either a racing - set_ready or set_shutdown functions. In both these cases, the closure - would have been scheduled for execution. So we are done here */ - return; - } - } - } -} - static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset); @@ -1259,33 +1064,27 @@ static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, } static bool fd_is_shutdown(grpc_fd *fd) { - grpc_error *err = (grpc_error *)gpr_atm_acq_load(&fd->shutdown_error); - return (((intptr_t)err & FD_SHUTDOWN_BIT) > 0); + return grpc_lfev_is_shutdown(&fd->read_closure); } /* Might be called multiple times */ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) { - /* Store the shutdown error ORed with FD_SHUTDOWN_BIT in fd->shutdown_error */ - if (gpr_atm_rel_cas(&fd->shutdown_error, (gpr_atm)GRPC_ERROR_NONE, - (gpr_atm)why | FD_SHUTDOWN_BIT)) { + if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure, + GRPC_ERROR_REF(why))) { shutdown(fd->fd, SHUT_RDWR); - - set_shutdown(exec_ctx, fd, &fd->read_closure, why); - set_shutdown(exec_ctx, fd, &fd->write_closure, why); - } else { - /* Shutdown already called */ - GRPC_ERROR_UNREF(why); + grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why)); } + GRPC_ERROR_UNREF(why); } static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - notify_on(exec_ctx, fd, &fd->read_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - notify_on(exec_ctx, fd, &fd->write_closure, closure); + grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { @@ -1333,7 +1132,7 @@ static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) { if (gpr_atm_no_barrier_cas(&worker->is_kicked, (gpr_atm)0, (gpr_atm)1)) { GRPC_POLLING_TRACE( "pollset_worker_kick: Kicking worker: %p (thread id: %ld)", - (void *)worker, worker->pt_id); + (void *)worker, (long int)worker->pt_id); int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal); if (err_num != 0) { err = GRPC_OS_ERROR(err_num, "pthread_kill"); @@ -1475,7 +1274,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - set_ready(exec_ctx, fd, &fd->read_closure); + grpc_lfev_set_ready(exec_ctx, &fd->read_closure); /* Note, it is possible that fd_become_readable might be called twice with different 'notifier's when an fd becomes readable and it is in two epoll @@ -1487,7 +1286,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - set_ready(exec_ctx, fd, &fd->write_closure); + grpc_lfev_set_ready(exec_ctx, &fd->write_closure); } static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, @@ -1717,7 +1516,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, worker.pt_id = pthread_self(); gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0); - *worker_hdl = &worker; + if (worker_hdl) *worker_hdl = &worker; gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset); gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); @@ -1795,7 +1594,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->po.mu); } - *worker_hdl = NULL; + if (worker_hdl) *worker_hdl = NULL; gpr_tls_set(&g_current_thread_pollset, (intptr_t)0); gpr_tls_set(&g_current_thread_worker, (intptr_t)0); diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index d90f223362..9834cdd197 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -871,7 +871,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; - *worker_hdl = &worker; + if (worker_hdl) *worker_hdl = &worker; grpc_error *error = GRPC_ERROR_NONE; /* Avoid malloc for small number of elements. */ @@ -1092,7 +1092,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); } } - *worker_hdl = NULL; + if (worker_hdl) *worker_hdl = NULL; GPR_TIMER_END("pollset_work", 0); GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error)); return error; diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index b5be5504b9..13409a4de8 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -111,6 +111,12 @@ static void try_engine(const char *engine) { } } +/* This should be used for testing purposes ONLY */ +void grpc_set_event_engine_test_only( + const grpc_event_engine_vtable *ev_engine) { + g_event_engine = ev_engine; +} + /* Call this only after calling grpc_event_engine_init() */ const char *grpc_get_poll_strategy_name() { return g_poll_strategy_name; } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1a9e5c115a..becc4d359e 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -183,4 +183,7 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; +/* This should be used for testing purposes ONLY */ +void grpc_set_event_engine_test_only(const grpc_event_engine_vtable *); + #endif /* GRPC_CORE_LIB_IOMGR_EV_POSIX_H */ diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.c new file mode 100644 index 0000000000..17e3bbf727 --- /dev/null +++ b/src/core/lib/iomgr/lockfree_event.c @@ -0,0 +1,238 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/lockfree_event.h" + +#include <grpc/support/log.h> + +/* 'state' holds the to call when the fd is readable or writable respectively. + It can contain one of the following values: + CLOSURE_READY : The fd has an I/O event of interest but there is no + closure yet to execute + + CLOSURE_NOT_READY : The fd has no I/O event of interest + + closure ptr : The closure to be executed when the fd has an I/O + event of interest + + shutdown_error | FD_SHUTDOWN_BIT : + 'shutdown_error' field ORed with FD_SHUTDOWN_BIT. + This indicates that the fd is shutdown. Since all + memory allocations are word-aligned, the lower two + bits of the shutdown_error pointer are always 0. So + it is safe to OR these with FD_SHUTDOWN_BIT + + Valid state transitions: + + <closure ptr> <-----3------ CLOSURE_NOT_READY ----1----> CLOSURE_READY + | | ^ | ^ | | + | | | | | | | + | +--------------4----------+ 6 +---------2---------------+ | + | | | + | v | + +-----5-------> [shutdown_error | FD_SHUTDOWN_BIT] <----7---------+ + + For 1, 4 : See grpc_lfev_set_ready() function + For 2, 3 : See grpc_lfev_notify_on() function + For 5,6,7: See grpc_lfev_set_shutdown() function */ + +#define CLOSURE_NOT_READY ((gpr_atm)0) +#define CLOSURE_READY ((gpr_atm)2) + +#define FD_SHUTDOWN_BIT ((gpr_atm)1) + +void grpc_lfev_init(gpr_atm *state) { + gpr_atm_no_barrier_store(state, CLOSURE_NOT_READY); +} + +void grpc_lfev_destroy(gpr_atm *state) { + gpr_atm curr = gpr_atm_no_barrier_load(state); + if (curr & FD_SHUTDOWN_BIT) { + GRPC_ERROR_UNREF((grpc_error *)(curr & ~FD_SHUTDOWN_BIT)); + } else { + GPR_ASSERT(curr == CLOSURE_NOT_READY || curr == CLOSURE_READY); + } +} + +bool grpc_lfev_is_shutdown(gpr_atm *state) { + gpr_atm curr = gpr_atm_no_barrier_load(state); + return (curr & FD_SHUTDOWN_BIT) != 0; +} + +void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state, + grpc_closure *closure) { + while (true) { + gpr_atm curr = gpr_atm_no_barrier_load(state); + switch (curr) { + case CLOSURE_NOT_READY: { + /* CLOSURE_NOT_READY -> <closure>. + + We're guaranteed by API that there's an acquire barrier before here, + so there's no need to double-dip and this can be a release-only. + + The release itself pairs with the acquire half of a set_ready full + barrier. */ + if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) { + return; /* Successful. Return */ + } + + break; /* retry */ + } + + case CLOSURE_READY: { + /* Change the state to CLOSURE_NOT_READY. Schedule the closure if + successful. If not, the state most likely transitioned to shutdown. + We should retry. + + This can be a no-barrier cas since the state is being transitioned to + CLOSURE_NOT_READY; set_ready and set_shutdown do not schedule any + closure when transitioning out of CLOSURE_NO_READY state (i.e there + is no other code that needs to 'happen-after' this) */ + if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + return; /* Successful. Return */ + } + + break; /* retry */ + } + + default: { + /* 'curr' is either a closure or the fd is shutdown(in which case 'curr' + contains a pointer to the shutdown-error). If the fd is shutdown, + schedule the closure with the shutdown error */ + if ((curr & FD_SHUTDOWN_BIT) > 0) { + grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT); + grpc_closure_sched(exec_ctx, closure, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD Shutdown", &shutdown_err, 1)); + return; + } + + /* There is already a closure!. This indicates a bug in the code */ + gpr_log(GPR_ERROR, + "notify_on called with a previous callback still pending"); + abort(); + } + } + } + + GPR_UNREACHABLE_CODE(return ); +} + +bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, + grpc_error *shutdown_err) { + gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT; + + while (true) { + gpr_atm curr = gpr_atm_no_barrier_load(state); + switch (curr) { + case CLOSURE_READY: + case CLOSURE_NOT_READY: + /* Need a full barrier here so that the initial load in notify_on + doesn't need a barrier */ + if (gpr_atm_full_cas(state, curr, new_state)) { + return true; /* early out */ + } + break; /* retry */ + + default: { + /* 'curr' is either a closure or the fd is already shutdown */ + + /* If fd is already shutdown, we are done */ + if ((curr & FD_SHUTDOWN_BIT) > 0) { + GRPC_ERROR_UNREF(shutdown_err); + return false; + } + + /* Fd is not shutdown. Schedule the closure and move the state to + shutdown state. + Needs an acquire to pair with setting the closure (and get a + happens-after on that edge), and a release to pair with anything + loading the shutdown state. */ + if (gpr_atm_full_cas(state, curr, new_state)) { + grpc_closure_sched(exec_ctx, (grpc_closure *)curr, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "FD Shutdown", &shutdown_err, 1)); + return true; + } + + /* 'curr' was a closure but now changed to a different state. We will + have to retry */ + break; + } + } + } + + GPR_UNREACHABLE_CODE(return false); +} + +void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state) { + while (true) { + gpr_atm curr = gpr_atm_no_barrier_load(state); + + switch (curr) { + case CLOSURE_READY: { + /* Already ready. We are done here */ + return; + } + + case CLOSURE_NOT_READY: { + /* No barrier required as we're transitioning to a state that does not + involve a closure */ + if (gpr_atm_no_barrier_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) { + return; /* early out */ + } + break; /* retry */ + } + + default: { + /* 'curr' is either a closure or the fd is shutdown */ + if ((curr & FD_SHUTDOWN_BIT) > 0) { + /* The fd is shutdown. Do nothing */ + return; + } + /* Full cas: acquire pairs with this cas' release in the event of a + spurious set_ready; release pairs with this or the acquire in + notify_on (or set_shutdown) */ + else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) { + grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); + return; + } + /* else the state changed again (only possible by either a racing + set_ready or set_shutdown functions. In both these cases, the closure + would have been scheduled for execution. So we are done here */ + return; + } + } + } +} diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h new file mode 100644 index 0000000000..1d9119204c --- /dev/null +++ b/src/core/lib/iomgr/lockfree_event.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H +#define GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H + +/* Lock free event notification for file descriptors */ + +#include <grpc/support/atm.h> + +#include "src/core/lib/iomgr/exec_ctx.h" + +void grpc_lfev_init(gpr_atm *state); +void grpc_lfev_destroy(gpr_atm *state); +bool grpc_lfev_is_shutdown(gpr_atm *state); + +void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state, + grpc_closure *closure); +/* Returns true on first successful shutdown */ +bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, + grpc_error *shutdown_err); +void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state); + +#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */ diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index e19ce697b8..9bf3cdac89 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -75,6 +75,10 @@ void grpc_pollset_destroy(grpc_pollset *pollset); and it is guaranteed that it will not be released by grpc_pollset_work AFTER worker has been destroyed. + It's legal for worker to be NULL: in that case, this specific thread can not + be directly woken with a kick, but maybe be indirectly (with a kick against + the pollset as a whole). + Tries not to block past deadline. May call grpc_closure_list_run on grpc_closure_list, without holding the pollset diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 17043c1ea1..04c6b71747 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -120,7 +120,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; - *worker_hdl = &worker; + if (worker_hdl) *worker_hdl = &worker; int added_worker = 0; worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = @@ -185,7 +185,7 @@ done: remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } gpr_cv_destroy(&worker.cv); - *worker_hdl = NULL; + if (worker_hdl) *worker_hdl = NULL; return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 94a454c0b7..269dc35003 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -85,6 +85,10 @@ #define GRPC_LINUX_SOCKETUTILS 1 #endif #endif +#ifndef __GLIBC__ +#define GRPC_LINUX_EPOLL 1 +#define GRPC_LINUX_EVENTFD 1 +#endif #ifndef GRPC_LINUX_EVENTFD #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 #endif diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 8dcd80d001..c3ee878651 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -142,6 +142,8 @@ struct grpc_resource_quota { /* Amount of free memory in the resource quota */ int64_t free_pool; + gpr_atm last_size; + /* Has rq_step been scheduled to occur? */ bool step_scheduled; /* Are we currently reclaiming memory */ @@ -581,6 +583,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { resource_quota->combiner = grpc_combiner_create(NULL); resource_quota->free_pool = INT64_MAX; resource_quota->size = INT64_MAX; + gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX); resource_quota->step_scheduled = false; resource_quota->reclaiming = false; gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0); @@ -643,11 +646,17 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, rq_resize_args *a = gpr_malloc(sizeof(*a)); a->resource_quota = grpc_resource_quota_ref_internal(resource_quota); a->size = (int64_t)size; + gpr_atm_no_barrier_store(&resource_quota->last_size, + (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size)); grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } +size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota) { + return (size_t)gpr_atm_no_barrier_load(&resource_quota->last_size); +} + /******************************************************************************* * grpc_resource_user channel args api */ diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index b9f62cbf83..6f99be0d51 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -90,6 +90,8 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args( double grpc_resource_quota_get_memory_pressure( grpc_resource_quota *resource_quota); +size_t grpc_resource_quota_peek_size(grpc_resource_quota *resource_quota); + typedef struct grpc_resource_user grpc_resource_user; grpc_resource_user *grpc_resource_user_create( diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index 0485661316..bc367bdfa5 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -40,10 +40,6 @@ #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resolve_address.h" -/* Channel arg (integer) setting how large a slice to try and read from the wire - each time recvmsg (or equivalent) is called */ -#define GRPC_ARG_TCP_READ_CHUNK_SIZE "grpc.experimental.tcp_read_chunk_size" - /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and NULL on failure). diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index a108b10da6..a2692707d9 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -137,29 +137,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { grpc_endpoint *grpc_tcp_client_create_from_fd( grpc_exec_ctx *exec_ctx, grpc_fd *fd, const grpc_channel_args *channel_args, const char *addr_str) { - size_t tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; - grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); - if (channel_args != NULL) { - for (size_t i = 0; i < channel_args->num_args; i++) { - if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { - grpc_integer_options options = {(int)tcp_read_chunk_size, 1, - 8 * 1024 * 1024}; - tcp_read_chunk_size = (size_t)grpc_channel_arg_get_integer( - &channel_args->args[i], options); - } else if (0 == - strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); - resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); - } - } - } - - grpc_endpoint *ep = - grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str); - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); - return ep; + return grpc_tcp_create(exec_ctx, fd, channel_args, addr_str); } static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index a356564766..d6baca50ba 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -63,7 +63,7 @@ typedef struct { int refs; grpc_closure on_connect; grpc_endpoint **endpoint; - grpc_resource_quota *resource_quota; + grpc_channel_args *channel_args; } async_connect; static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, @@ -72,7 +72,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx, int done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { - grpc_resource_quota_unref_internal(exec_ctx, ac->resource_quota); + grpc_channel_args_destroy(exec_ctx, ac->channel_args); gpr_mu_destroy(&ac->mu); gpr_free(ac->addr_name); gpr_free(ac); @@ -119,7 +119,8 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { if (!wsa_success) { error = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx"); } else { - *ep = grpc_tcp_create(socket, ac->resource_quota, ac->addr_name); + *ep = + grpc_tcp_create(exec_ctx, socket, ac->channel_args, ac->addr_name); socket = NULL; } } else { @@ -152,17 +153,6 @@ static void tcp_client_connect_impl( grpc_winsocket_callback_info *info; grpc_error *error = GRPC_ERROR_NONE; - grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); - if (channel_args != NULL) { - for (size_t i = 0; i < channel_args->num_args; i++) { - if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); - resource_quota = grpc_resource_quota_ref_internal( - channel_args->args[i].value.pointer.p); - } - } - } - *endpoint = NULL; /* Use dualstack sockets where available. */ @@ -225,7 +215,7 @@ static void tcp_client_connect_impl( ac->refs = 2; ac->addr_name = grpc_sockaddr_to_uri(addr); ac->endpoint = endpoint; - ac->resource_quota = resource_quota; + ac->channel_args = grpc_channel_args_copy(channel_args); grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); @@ -247,7 +237,6 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } - grpc_resource_quota_unref_internal(exec_ctx, resource_quota); grpc_closure_sched(exec_ctx, on_done, final_error); } diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 4d7cf3ff51..5f4b38de2b 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -52,7 +52,9 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/profiling/timers.h" @@ -80,10 +82,14 @@ typedef struct { int fd; bool finished_edge; msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */ - size_t slice_size; + double target_length; + double bytes_read_this_round; gpr_refcount refcount; gpr_atm shutdown_count; + int min_read_chunk_size; + int max_read_chunk_size; + /* garbage after the last read */ grpc_slice_buffer last_read_buffer; @@ -108,6 +114,42 @@ typedef struct { grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; +static void add_to_estimate(grpc_tcp *tcp, size_t bytes) { + tcp->bytes_read_this_round += (double)bytes; +} + +static void finish_estimate(grpc_tcp *tcp) { + /* If we read >80% of the target buffer in one read loop, increase the size + of the target buffer to either the amount read, or twice its previous + value */ + if (tcp->bytes_read_this_round > tcp->target_length * 0.8) { + tcp->target_length = + GPR_MAX(2 * tcp->target_length, tcp->bytes_read_this_round); + } else { + tcp->target_length = + 0.99 * tcp->target_length + 0.01 * tcp->bytes_read_this_round; + } + tcp->bytes_read_this_round = 0; +} + +static size_t get_target_read_size(grpc_tcp *tcp) { + grpc_resource_quota *rq = grpc_resource_user_quota(tcp->resource_user); + double pressure = grpc_resource_quota_get_memory_pressure(rq); + double target = + tcp->target_length * (pressure > 0.8 ? (1.0 - pressure) / 0.2 : 1.0); + size_t sz = (((size_t)GPR_CLAMP(target, tcp->min_read_chunk_size, + tcp->max_read_chunk_size)) + + 255) & + ~(size_t)255; + /* don't use more than 1/16th of the overall resource quota for a single read + * alloc */ + size_t rqmax = grpc_resource_quota_peek_size(rq); + if (sz > rqmax / 16 && rqmax > 1024) { + sz = rqmax / 16; + } + return sz; +} + static grpc_error *tcp_annotate_error(grpc_error *src_error, grpc_tcp *tcp) { return grpc_error_set_str( grpc_error_set_int(src_error, GRPC_ERROR_INT_FD, tcp->fd), @@ -232,9 +274,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { /* NB: After calling call_read_cb a parallel call of the read handler may * be running. */ if (errno == EAGAIN) { - if (tcp->iov_size > 1) { - tcp->iov_size /= 2; - } + finish_estimate(tcp); /* We've consumed the edge, request a new one */ grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { @@ -253,14 +293,13 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp)); TCP_UNREF(exec_ctx, tcp, "read"); } else { + add_to_estimate(tcp, (size_t)read_bytes); GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { grpc_slice_buffer_trim_end( tcp->incoming_buffer, tcp->incoming_buffer->length - (size_t)read_bytes, &tcp->last_read_buffer); - } else if (tcp->iov_size < MAX_READ_IOVEC) { - ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); call_read_cb(exec_ctx, tcp, GRPC_ERROR_NONE); @@ -285,11 +324,11 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp, } static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { - if (tcp->incoming_buffer->count < (size_t)tcp->iov_size) { - grpc_resource_user_alloc_slices( - exec_ctx, &tcp->slice_allocator, tcp->slice_size, - (size_t)tcp->iov_size - tcp->incoming_buffer->count, - tcp->incoming_buffer); + size_t target_read_size = get_target_read_size(tcp); + if (tcp->incoming_buffer->length < target_read_size && + tcp->incoming_buffer->count < MAX_READ_IOVEC) { + grpc_resource_user_alloc_slices(exec_ctx, &tcp->slice_allocator, + target_read_size, 1, tcp->incoming_buffer); } else { tcp_do_read(exec_ctx, tcp); } @@ -540,9 +579,50 @@ static const grpc_endpoint_vtable vtable = {tcp_read, tcp_get_peer, tcp_get_fd}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, - grpc_resource_quota *resource_quota, - size_t slice_size, const char *peer_string) { +#define MAX_CHUNK_SIZE 32 * 1024 * 1024 + +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, + const grpc_channel_args *channel_args, + const char *peer_string) { + int tcp_read_chunk_size = GRPC_TCP_DEFAULT_READ_SLICE_SIZE; + int tcp_max_read_chunk_size = 4 * 1024 * 1024; + int tcp_min_read_chunk_size = 256; + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_TCP_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + MAX_CHUNK_SIZE}; + tcp_read_chunk_size = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + MAX_CHUNK_SIZE}; + tcp_min_read_chunk_size = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE)) { + grpc_integer_options options = {(int)tcp_read_chunk_size, 1, + MAX_CHUNK_SIZE}; + tcp_max_read_chunk_size = + grpc_channel_arg_get_integer(&channel_args->args[i], options); + } else if (0 == + strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + channel_args->args[i].value.pointer.p); + } + } + } + + if (tcp_min_read_chunk_size > tcp_max_read_chunk_size) { + tcp_min_read_chunk_size = tcp_max_read_chunk_size; + } + tcp_read_chunk_size = GPR_CLAMP(tcp_read_chunk_size, tcp_min_read_chunk_size, + tcp_max_read_chunk_size); + grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->peer_string = gpr_strdup(peer_string); @@ -552,7 +632,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, tcp->release_fd_cb = NULL; tcp->release_fd = NULL; tcp->incoming_buffer = NULL; - tcp->slice_size = slice_size; + tcp->target_length = (double)tcp_read_chunk_size; + tcp->min_read_chunk_size = tcp_min_read_chunk_size; + tcp->max_read_chunk_size = tcp_max_read_chunk_size; + tcp->bytes_read_this_round = 0; tcp->iov_size = 1; tcp->finished_edge = true; /* paired with unref in grpc_tcp_destroy */ @@ -569,6 +652,7 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); return &tcp->base; } diff --git a/src/core/lib/iomgr/tcp_posix.h b/src/core/lib/iomgr/tcp_posix.h index 1c0d13f96e..1ad5788331 100644 --- a/src/core/lib/iomgr/tcp_posix.h +++ b/src/core/lib/iomgr/tcp_posix.h @@ -47,14 +47,13 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" -#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 - extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, grpc_resource_quota *resource_quota, - size_t read_slice_size, const char *peer_string); +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *fd, + const grpc_channel_args *args, + const char *peer_string); /* Return the tcp endpoint's fd, or -1 if this is not available. Does not release the fd. diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index d6a017cf7f..e66ffc9b1c 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -59,6 +59,7 @@ #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -90,7 +91,6 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; - s->resource_quota = grpc_resource_quota_create(NULL); s->expand_wildcard_addrs = false; for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { @@ -98,27 +98,14 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, s->so_reuseport = has_so_reuseport && (args->args[i].value.integer != 0); } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT " must be an integer"); } - } else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { - if (args->args[i].type == GRPC_ARG_POINTER) { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - s->resource_quota = - grpc_resource_quota_ref_internal(args->args[i].value.pointer.p); - } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - gpr_free(s); - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); - } } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { s->expand_wildcard_addrs = (args->args[i].value.integer != 0); } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); gpr_free(s); return GRPC_ERROR_CREATE_FROM_STATIC_STRING( GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer"); @@ -138,6 +125,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, s->head = NULL; s->tail = NULL; s->nports = 0; + s->channel_args = grpc_channel_args_copy(args); gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0); *server = s; return GRPC_ERROR_NONE; @@ -158,8 +146,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { s->head = sp->next; gpr_free(sp); } - - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); + grpc_channel_args_destroy(exec_ctx, s->channel_args); gpr_free(s); } @@ -286,8 +273,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { sp->server->on_accept_cb( exec_ctx, sp->server->on_accept_cb_arg, - grpc_tcp_create(fdobj, sp->server->resource_quota, - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), + grpc_tcp_create(exec_ctx, fdobj, sp->server->channel_args, addr_str), read_notifier_pollset, acceptor); gpr_free(name); diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h index f5dc8532f9..c15e2e1493 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix.h +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -103,7 +103,8 @@ struct grpc_tcp_server { /* next pollset to assign a channel to */ gpr_atm next_pollset_to_assign; - grpc_resource_quota *resource_quota; + /* channel args for this server */ + grpc_channel_args *channel_args; }; /* If successful, add a listener to \a s for \a addr, set \a dsmode for the diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 12ce7d3fdd..4c17f08918 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -46,6 +46,7 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/pollset_windows.h" #include "src/core/lib/iomgr/resolve_address.h" @@ -102,7 +103,7 @@ struct grpc_tcp_server { /* shutdown callback */ grpc_closure *shutdown_complete; - grpc_resource_quota *resource_quota; + grpc_channel_args *channel_args; }; /* Public function. Allocates the proper data structures to hold a @@ -112,21 +113,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, const grpc_channel_args *args, grpc_tcp_server **server) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); - s->resource_quota = grpc_resource_quota_create(NULL); - for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { - if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { - if (args->args[i].type == GRPC_ARG_POINTER) { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - s->resource_quota = - grpc_resource_quota_ref_internal(args->args[i].value.pointer.p); - } else { - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); - gpr_free(s); - return GRPC_ERROR_CREATE_FROM_STATIC_STRING( - GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); - } - } - } + s->channel_args = grpc_channel_args_copy(args); gpr_ref_init(&s->refs, 1); gpr_mu_init(&s->mu); s->active_ports = 0; @@ -155,7 +142,7 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, grpc_winsocket_destroy(sp->socket); gpr_free(sp); } - grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); + grpc_channel_args_destroy(exec_ctx, s->channel_args); gpr_free(s); } @@ -383,8 +370,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_free(utf8_message); } gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string); - ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), - sp->server->resource_quota, peer_name_string); + ep = grpc_tcp_create(exec_ctx, grpc_winsocket_create(sock, fd_name), + sp->server->channel_args, peer_name_string); gpr_free(fd_name); gpr_free(peer_name_string); } else { diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 9134883226..f74aa68793 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -430,9 +430,19 @@ static grpc_endpoint_vtable vtable = {win_read, win_get_peer, win_get_fd}; -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, - grpc_resource_quota *resource_quota, +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, + grpc_channel_args *channel_args, char *peer_string) { + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_ref_internal( + channel_args->args[i].value.pointer.p); + } + } + } grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; diff --git a/src/core/lib/iomgr/tcp_windows.h b/src/core/lib/iomgr/tcp_windows.h index 4402de1c38..abafdb22d2 100644 --- a/src/core/lib/iomgr/tcp_windows.h +++ b/src/core/lib/iomgr/tcp_windows.h @@ -50,8 +50,8 @@ /* Create a tcp endpoint given a winsock handle. * Takes ownership of the handle. */ -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, - grpc_resource_quota *resource_quota, +grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, + grpc_channel_args *channel_args, char *peer_string); grpc_error *grpc_tcp_prepare_socket(SOCKET sock); diff --git a/src/core/lib/security/credentials/credentials.h b/src/core/lib/security/credentials/credentials.h index 510b79552a..89b8e3c0b3 100644 --- a/src/core/lib/security/credentials/credentials.h +++ b/src/core/lib/security/credentials/credentials.h @@ -71,7 +71,7 @@ typedef enum { #define GRPC_SECURE_TOKEN_REFRESH_THRESHOLD_SECS 60 -#define GRPC_COMPUTE_ENGINE_METADATA_HOST "metadata" +#define GRPC_COMPUTE_ENGINE_METADATA_HOST "metadata.google.internal" #define GRPC_COMPUTE_ENGINE_METADATA_TOKEN_PATH \ "/computeMetadata/v1/instance/service-accounts/default/token" diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 2b51706161..dbe3263f92 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -448,14 +448,14 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create( typedef struct { grpc_channel_security_connector base; - tsi_ssl_handshaker_factory *handshaker_factory; + tsi_ssl_client_handshaker_factory *handshaker_factory; char *target_name; char *overridden_target_name; } grpc_ssl_channel_security_connector; typedef struct { grpc_server_security_connector base; - tsi_ssl_handshaker_factory *handshaker_factory; + tsi_ssl_server_handshaker_factory *handshaker_factory; } grpc_ssl_server_security_connector; static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx, @@ -464,7 +464,7 @@ static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx, (grpc_ssl_channel_security_connector *)sc; grpc_call_credentials_unref(exec_ctx, c->base.request_metadata_creds); if (c->handshaker_factory != NULL) { - tsi_ssl_handshaker_factory_destroy(c->handshaker_factory); + tsi_ssl_client_handshaker_factory_destroy(c->handshaker_factory); } if (c->target_name != NULL) gpr_free(c->target_name); if (c->overridden_target_name != NULL) gpr_free(c->overridden_target_name); @@ -476,26 +476,11 @@ static void ssl_server_destroy(grpc_exec_ctx *exec_ctx, grpc_ssl_server_security_connector *c = (grpc_ssl_server_security_connector *)sc; if (c->handshaker_factory != NULL) { - tsi_ssl_handshaker_factory_destroy(c->handshaker_factory); + tsi_ssl_server_handshaker_factory_destroy(c->handshaker_factory); } gpr_free(sc); } -static grpc_security_status ssl_create_handshaker( - tsi_ssl_handshaker_factory *handshaker_factory, bool is_client, - const char *peer_name, tsi_handshaker **handshaker) { - tsi_result result = TSI_OK; - if (handshaker_factory == NULL) return GRPC_SECURITY_ERROR; - result = tsi_ssl_handshaker_factory_create_handshaker( - handshaker_factory, is_client ? peer_name : NULL, handshaker); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.", - tsi_result_to_string(result)); - return GRPC_SECURITY_ERROR; - } - return GRPC_SECURITY_OK; -} - static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx, grpc_channel_security_connector *sc, grpc_handshake_manager *handshake_mgr) { @@ -503,11 +488,17 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx, (grpc_ssl_channel_security_connector *)sc; // Instantiate TSI handshaker. tsi_handshaker *tsi_hs = NULL; - ssl_create_handshaker(c->handshaker_factory, true /* is_client */, - c->overridden_target_name != NULL - ? c->overridden_target_name - : c->target_name, - &tsi_hs); + tsi_result result = tsi_ssl_client_handshaker_factory_create_handshaker( + c->handshaker_factory, + c->overridden_target_name != NULL ? c->overridden_target_name + : c->target_name, + &tsi_hs); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.", + tsi_result_to_string(result)); + return; + } + // Create handshakers. grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( exec_ctx, tsi_hs, &sc->base)); @@ -520,8 +511,14 @@ static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx, (grpc_ssl_server_security_connector *)sc; // Instantiate TSI handshaker. tsi_handshaker *tsi_hs = NULL; - ssl_create_handshaker(c->handshaker_factory, false /* is_client */, - NULL /* peer_name */, &tsi_hs); + tsi_result result = tsi_ssl_server_handshaker_factory_create_handshaker( + c->handshaker_factory, &tsi_hs); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Handshaker creation failed with error %s.", + tsi_result_to_string(result)); + return; + } + // Create handshakers. grpc_handshake_manager_add(handshake_mgr, grpc_security_handshaker_create( exec_ctx, tsi_hs, &sc->base)); diff --git a/src/core/lib/slice/slice_buffer.c b/src/core/lib/slice/slice_buffer.c index 9176dc8a42..c96b9c3b28 100644 --- a/src/core/lib/slice/slice_buffer.c +++ b/src/core/lib/slice/slice_buffer.c @@ -46,27 +46,29 @@ #define GROW(x) (3 * (x) / 2) static void maybe_embiggen(grpc_slice_buffer *sb) { - if (sb->base_slices != sb->slices) { - memmove(sb->base_slices, sb->slices, sb->count * sizeof(grpc_slice)); - sb->slices = sb->base_slices; - } - /* How far away from sb->base_slices is sb->slices pointer */ size_t slice_offset = (size_t)(sb->slices - sb->base_slices); size_t slice_count = sb->count + slice_offset; if (slice_count == sb->capacity) { - sb->capacity = GROW(sb->capacity); - GPR_ASSERT(sb->capacity > slice_count); - if (sb->base_slices == sb->inlined) { - sb->base_slices = gpr_malloc(sb->capacity * sizeof(grpc_slice)); - memcpy(sb->base_slices, sb->inlined, slice_count * sizeof(grpc_slice)); + if (sb->base_slices != sb->slices) { + /* Make room by moving elements if there's still space unused */ + memmove(sb->base_slices, sb->slices, sb->count * sizeof(grpc_slice)); + sb->slices = sb->base_slices; } else { - sb->base_slices = - gpr_realloc(sb->base_slices, sb->capacity * sizeof(grpc_slice)); - } + /* Allocate more memory if no more space is available */ + sb->capacity = GROW(sb->capacity); + GPR_ASSERT(sb->capacity > slice_count); + if (sb->base_slices == sb->inlined) { + sb->base_slices = gpr_malloc(sb->capacity * sizeof(grpc_slice)); + memcpy(sb->base_slices, sb->inlined, slice_count * sizeof(grpc_slice)); + } else { + sb->base_slices = + gpr_realloc(sb->base_slices, sb->capacity * sizeof(grpc_slice)); + } - sb->slices = sb->base_slices + slice_offset; + sb->slices = sb->base_slices + slice_offset; + } } } diff --git a/src/core/lib/support/cpu_linux.c b/src/core/lib/support/cpu_linux.c index d6f7e7d3da..1e50f59823 100644 --- a/src/core/lib/support/cpu_linux.c +++ b/src/core/lib/support/cpu_linux.c @@ -67,12 +67,17 @@ unsigned gpr_cpu_num_cores(void) { } unsigned gpr_cpu_current_cpu(void) { +#ifdef __GLIBC__ int cpu = sched_getcpu(); if (cpu < 0) { gpr_log(GPR_ERROR, "Error determining current CPU: %s\n", strerror(errno)); return 0; } return (unsigned)cpu; +#else + // sched_getcpu() is undefined on musl + return 0; +#endif } #endif /* GPR_CPU_LINUX */ diff --git a/src/core/lib/support/wrap_memcpy.c b/src/core/lib/support/wrap_memcpy.c index 15c289f7b8..050cc6db5e 100644 --- a/src/core/lib/support/wrap_memcpy.c +++ b/src/core/lib/support/wrap_memcpy.c @@ -40,7 +40,7 @@ */ #ifdef __linux__ -#ifdef __x86_64__ +#if defined(__x86_64__) && defined(__GNU_LIBRARY__) __asm__(".symver memcpy,memcpy@GLIBC_2.2.5"); void *__wrap_memcpy(void *destination, const void *source, size_t num) { return memcpy(destination, source, num); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 6898d7ccb4..6a8ae03a21 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -145,17 +145,29 @@ typedef struct batch_control { grpc_transport_stream_op_batch op; } batch_control; +typedef struct { + gpr_mu child_list_mu; + grpc_call *first_child; +} parent_call; + +typedef struct { + grpc_call *parent; + /** siblings: children of the same parent form a list, and this list is + protected under + parent->mu */ + grpc_call *sibling_next; + grpc_call *sibling_prev; +} child_call; + struct grpc_call { gpr_refcount ext_ref; gpr_arena *arena; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; - grpc_call *parent; - grpc_call *first_child; gpr_timespec start_time; - /* protects first_child, and child next/prev links */ - gpr_mu child_list_mu; + /* parent_call* */ gpr_atm parent_call_atm; + child_call *child_call; /* client or server call */ bool is_client; @@ -207,12 +219,6 @@ struct grpc_call { int send_extra_metadata_count; gpr_timespec send_deadline; - /** siblings: children of the same parent form a list, and this list is - protected under - parent->mu */ - grpc_call *sibling_next; - grpc_call *sibling_prev; - grpc_slice_buffer_stream sending_stream; grpc_byte_stream *receiving_stream; @@ -281,6 +287,23 @@ void *grpc_call_arena_alloc(grpc_call *call, size_t size) { return gpr_arena_alloc(call->arena, size); } +static parent_call *get_or_create_parent_call(grpc_call *call) { + parent_call *p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); + if (p == NULL) { + p = gpr_arena_alloc(call->arena, sizeof(*p)); + gpr_mu_init(&p->child_list_mu); + if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm)NULL, (gpr_atm)p)) { + gpr_mu_destroy(&p->child_list_mu); + p = (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); + } + } + return p; +} + +static parent_call *get_parent_call(grpc_call *call) { + return (parent_call *)gpr_atm_acq_load(&call->parent_call_atm); +} + grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, const grpc_call_create_args *args, grpc_call **out_call) { @@ -297,10 +320,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_ref_init(&call->ext_ref, 1); call->arena = arena; *out_call = call; - gpr_mu_init(&call->child_list_mu); call->channel = args->channel; call->cq = args->cq; - call->parent = args->parent_call; call->start_time = gpr_now(GPR_CLOCK_MONOTONIC); /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); @@ -332,11 +353,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, gpr_convert_clock_type(args->send_deadline, GPR_CLOCK_MONOTONIC); if (args->parent_call != NULL) { + child_call *cc = call->child_call = + gpr_arena_alloc(arena, sizeof(child_call)); + call->child_call->parent = args->parent_call; + GRPC_CALL_INTERNAL_REF(args->parent_call, "child"); GPR_ASSERT(call->is_client); GPR_ASSERT(!args->parent_call->is_client); - gpr_mu_lock(&args->parent_call->child_list_mu); + parent_call *pc = get_or_create_parent_call(args->parent_call); + + gpr_mu_lock(&pc->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -370,17 +397,17 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } } - if (args->parent_call->first_child == NULL) { - args->parent_call->first_child = call; - call->sibling_next = call->sibling_prev = call; + if (pc->first_child == NULL) { + pc->first_child = call; + cc->sibling_next = cc->sibling_prev = call; } else { - call->sibling_next = args->parent_call->first_child; - call->sibling_prev = args->parent_call->first_child->sibling_prev; - call->sibling_next->sibling_prev = call->sibling_prev->sibling_next = - call; + cc->sibling_next = pc->first_child; + cc->sibling_prev = pc->first_child->child_call->sibling_prev; + cc->sibling_next->child_call->sibling_prev = + cc->sibling_prev->child_call->sibling_next = call; } - gpr_mu_unlock(&args->parent_call->child_list_mu); + gpr_mu_unlock(&pc->child_list_mu); } call->send_deadline = send_deadline; @@ -475,7 +502,10 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - gpr_mu_destroy(&c->child_list_mu); + parent_call *pc = get_parent_call(c); + if (pc != NULL) { + gpr_mu_destroy(&pc->child_list_mu); + } for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } @@ -509,31 +539,31 @@ void grpc_call_ref(grpc_call *c) { gpr_ref(&c->ext_ref); } void grpc_call_unref(grpc_call *c) { if (!gpr_unref(&c->ext_ref)) return; - int cancel; - grpc_call *parent = c->parent; + child_call *cc = c->child_call; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_call_unref", 0); GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c)); - if (parent) { - gpr_mu_lock(&parent->child_list_mu); - if (c == parent->first_child) { - parent->first_child = c->sibling_next; - if (c == parent->first_child) { - parent->first_child = NULL; + if (cc) { + parent_call *pc = get_parent_call(cc->parent); + gpr_mu_lock(&pc->child_list_mu); + if (c == pc->first_child) { + pc->first_child = cc->sibling_next; + if (c == pc->first_child) { + pc->first_child = NULL; } } - c->sibling_prev->sibling_next = c->sibling_next; - c->sibling_next->sibling_prev = c->sibling_prev; - gpr_mu_unlock(&parent->child_list_mu); - GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); + cc->sibling_prev->child_call->sibling_next = cc->sibling_next; + cc->sibling_next->child_call->sibling_prev = cc->sibling_prev; + gpr_mu_unlock(&pc->child_list_mu); + GRPC_CALL_INTERNAL_UNREF(&exec_ctx, cc->parent, "child"); } GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && - !gpr_atm_acq_load(&c->received_final_op_atm); + bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 && + gpr_atm_acq_load(&c->received_final_op_atm) == 0; if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); @@ -1089,7 +1119,6 @@ static grpc_error *consolidate_batch_errors(batch_control *bctl) { static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl) { - grpc_call *child_call; grpc_call *next_child_call; grpc_call *call = bctl->call; grpc_error *error = consolidate_batch_errors(bctl); @@ -1114,21 +1143,25 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, /* propagate cancellation to any interested children */ gpr_atm_rel_store(&call->received_final_op_atm, 1); - gpr_mu_lock(&call->child_list_mu); - child_call = call->first_child; - if (child_call != NULL) { - do { - next_child_call = child_call->sibling_next; - if (child_call->cancellation_is_inherited) { - GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_CANCELLED); - GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); - } - child_call = next_child_call; - } while (child_call != call->first_child); + parent_call *pc = get_parent_call(call); + if (pc != NULL) { + grpc_call *child; + gpr_mu_lock(&pc->child_list_mu); + child = pc->first_child; + if (child != NULL) { + do { + next_child_call = child->child_call->sibling_next; + if (child->cancellation_is_inherited) { + GRPC_CALL_INTERNAL_REF(child, "propagate_cancel"); + cancel_with_error(exec_ctx, child, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, child, "propagate_cancel"); + } + child = next_child_call; + } while (child != pc->first_child); + } + gpr_mu_unlock(&pc->child_list_mu); } - gpr_mu_unlock(&call->child_list_mu); if (call->is_client) { get_final_status(call, set_status_value_directly, diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b4594817e4..3273addf1d 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -345,7 +345,6 @@ static void dump_pending_tags(grpc_completion_queue *cc) {} grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; - grpc_pollset_worker *worker = NULL; gpr_timespec now; GPR_TIMER_BEGIN("grpc_completion_queue_next", 0); @@ -426,8 +425,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(cc->mu); continue; } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), - &worker, now, iteration_deadline); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, + now, iteration_deadline); if (err != GRPC_ERROR_NONE) { gpr_mu_unlock(cc->mu); const char *msg = grpc_error_string(err); diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index 921ef87e36..746134676f 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -31,6 +31,8 @@ * */ +#include <grpc/support/port_platform.h> + #include "src/core/lib/surface/init.h" #include <limits.h> |