diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c | 13 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epollsig_linux.c | 13 | ||||
-rw-r--r-- | src/core/lib/iomgr/resolve_address_uv.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_uv.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_uv.c | 1 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.c | 15 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer_manager.c | 101 | ||||
-rw-r--r-- | src/core/lib/security/credentials/jwt/jwt_verifier.c | 45 | ||||
-rw-r--r-- | src/core/lib/security/transport/client_auth_filter.c | 100 | ||||
-rw-r--r-- | src/core/lib/security/transport/server_auth_filter.c | 143 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.c | 25 | ||||
-rw-r--r-- | src/core/lib/support/mpscq.h | 26 | ||||
-rw-r--r-- | src/core/lib/support/stack_lockfree.c | 137 | ||||
-rw-r--r-- | src/core/lib/support/stack_lockfree.h | 38 | ||||
-rw-r--r-- | src/core/lib/surface/server.c | 112 | ||||
-rw-r--r-- | src/core/lib/transport/static_metadata.c | 3 |
16 files changed, 506 insertions, 273 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index 2c91ad357c..9f82c480bc 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -57,9 +57,6 @@ #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) -/* Uncomment the following to enable extra checks on poll_object operations */ -/* #define PO_DEBUG */ - /* The maximum number of polling threads per polling island. By default no limit */ static int g_max_pollers_per_pi = INT_MAX; @@ -92,7 +89,7 @@ typedef enum { } poll_obj_type; typedef struct poll_obj { -#ifdef PO_DEBUG +#ifndef NDEBUG poll_obj_type obj_type; #endif gpr_mu mu; @@ -893,7 +890,7 @@ static grpc_fd *fd_create(int fd, const char *name) { * would be holding a lock to it anyway. */ gpr_mu_lock(&new_fd->po.mu); new_fd->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG new_fd->po.obj_type = POLL_OBJ_FD; #endif @@ -1171,7 +1168,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->po.mu); *mu = &pollset->po.mu; pollset->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pollset->po.obj_type = POLL_OBJ_POLLSET; #endif @@ -1625,7 +1622,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag, poll_obj_type item_type) { GPR_TIMER_BEGIN("add_poll_object", 0); -#ifdef PO_DEBUG +#ifndef NDEBUG GPR_ASSERT(item->obj_type == item_type); GPR_ASSERT(bag->obj_type == bag_type); #endif @@ -1784,7 +1781,7 @@ static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = gpr_malloc(sizeof(*pss)); gpr_mu_init(&pss->po.mu); pss->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pss->po.obj_type = POLL_OBJ_POLLSET_SET; #endif return pss; diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 255e07010b..3c4ca9c7c5 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -54,9 +54,6 @@ gpr_log(GPR_INFO, __VA_ARGS__); \ } -/* Uncomment the following to enable extra checks on poll_object operations */ -/* #define PO_DEBUG */ - static int grpc_wakeup_signal = -1; static bool is_grpc_wakeup_signal_initialized = false; @@ -85,7 +82,7 @@ typedef enum { } poll_obj_type; typedef struct poll_obj { -#ifdef PO_DEBUG +#ifndef NDEBUG poll_obj_type obj_type; #endif gpr_mu mu; @@ -821,7 +818,7 @@ static grpc_fd *fd_create(int fd, const char *name) { * would be holding a lock to it anyway. */ gpr_mu_lock(&new_fd->po.mu); new_fd->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG new_fd->po.obj_type = POLL_OBJ_FD; #endif @@ -1079,7 +1076,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->po.mu); *mu = &pollset->po.mu; pollset->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pollset->po.obj_type = POLL_OBJ_POLLSET; #endif @@ -1416,7 +1413,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag, poll_obj_type item_type) { GPR_TIMER_BEGIN("add_poll_object", 0); -#ifdef PO_DEBUG +#ifndef NDEBUG GPR_ASSERT(item->obj_type == item_type); GPR_ASSERT(bag->obj_type == bag_type); #endif @@ -1575,7 +1572,7 @@ static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = gpr_malloc(sizeof(*pss)); gpr_mu_init(&pss->po.mu); pss->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pss->po.obj_type = POLL_OBJ_POLLSET_SET; #endif return pss; diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 45de289e45..a98b8e62db 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -54,7 +54,7 @@ static int retry_named_port_failure(int status, request *r, int retry_status; uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t)); req->data = r; - r->port = svc[i][1]; + r->port = gpr_strdup(svc[i][1]); retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb, r->host, r->port, r->hints); if (retry_status < 0 || getaddrinfo_cb == NULL) { @@ -127,6 +127,8 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error); grpc_exec_ctx_finish(&exec_ctx); gpr_free(r->hints); + gpr_free(r->host); + gpr_free(r->port); gpr_free(r); uv_freeaddrinfo(res); } diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index ab6832932f..2f1d237c07 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -48,6 +48,7 @@ typedef struct grpc_uv_tcp_connect { static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx, grpc_uv_tcp_connect *connect) { grpc_resource_quota_unref_internal(exec_ctx, connect->resource_quota); + gpr_free(connect->addr_name); gpr_free(connect); } @@ -105,6 +106,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { } done = (--connect->refs == 0); if (done) { + grpc_exec_ctx_flush(&exec_ctx); uv_tcp_connect_cleanup(&exec_ctx, connect); } GRPC_CLOSURE_SCHED(&exec_ctx, closure, error); @@ -140,6 +142,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, connect->resource_quota = resource_quota; uv_tcp_init(uv_default_loop(), connect->tcp_handle); connect->connect_req.data = connect; + connect->refs = 1; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2de0ea90e7..2ab836cc34 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -234,6 +234,7 @@ static void on_connect(uv_stream_t *server, int status) { sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, acceptor); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(peer_name_string); } } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 7c21b44e76..ff5fd3edc8 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -65,7 +65,10 @@ typedef struct { } grpc_tcp; static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + grpc_slice_unref_internal(exec_ctx, tcp->read_slice); grpc_resource_user_unref(exec_ctx, tcp->resource_user); + gpr_free(tcp->handle); + gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -115,13 +118,17 @@ static void uv_close_callback(uv_handle_t *handle) { grpc_exec_ctx_finish(&exec_ctx); } +static grpc_slice alloc_read_slice(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { + return grpc_resource_user_slice_malloc(exec_ctx, resource_user, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE); +} + static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp *tcp = handle->data; (void)suggested_size; - tcp->read_slice = grpc_resource_user_slice_malloc( - &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); grpc_exec_ctx_finish(&exec_ctx); @@ -148,6 +155,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // Successful read sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); grpc_slice_buffer_add(tcp->read_slices, sub); + tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); error = GRPC_ERROR_NONE; if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t i; @@ -334,6 +342,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp); @@ -350,6 +359,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); + tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); @@ -357,6 +367,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, uv_unref((uv_handle_t *)handle); #endif + grpc_exec_ctx_finish(&exec_ctx); return &tcp->base; } diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 520d4a3252..cb7998db97 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -50,6 +50,9 @@ static completed_thread *g_completed_threads; static bool g_kicked; // is there a thread waiting until the next timer should fire? static bool g_has_timed_waiter; +// the deadline of the current timed waiter thread (only relevant if +// g_has_timed_waiter is true) +static gpr_timespec g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; @@ -101,8 +104,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { start_timer_thread_and_unlock(); } else { // if there's no thread waiting with a timeout, kick an existing - // waiter - // so that the next deadline is not missed + // waiter so that the next deadline is not missed if (!g_has_timed_waiter) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "kick untimed waiter"); @@ -132,44 +134,79 @@ static bool wait_until(gpr_timespec next) { gpr_mu_unlock(&g_mu); return false; } - // if there's no timed waiter, we should become one: that waiter waits - // only until the next timer should expire - // all other timers wait forever - uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; - if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) { - g_has_timed_waiter = true; - // we use a generation counter to track the timed waiter so we can - // cancel an existing one quickly (and when it actually times out it'll - // figure stuff out instead of incurring a wakeup) - my_timed_waiter_generation = ++g_timed_waiter_generation; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", - wait_time.tv_sec, wait_time.tv_nsec); + + // If g_kicked is true at this point, it means there was a kick from the timer + // system that the timer-manager threads here missed. We cannot trust 'next' + // here any longer (since there might be an earlier deadline). So if g_kicked + // is true at this point, we should quickly exit this and get the next + // deadline from the timer system + + if (!g_kicked) { + // if there's no timed waiter, we should become one: that waiter waits + // only until the next timer should expire. All other timers wait forever + // + // 'g_timed_waiter_generation' is a global generation counter. The idea here + // is that the thread becoming a timed-waiter increments and stores this + // global counter locally in 'my_timed_waiter_generation' before going to + // sleep. After waking up, if my_timed_waiter_generation == + // g_timed_waiter_generation, it can be sure that it was the timed_waiter + // thread (and that no other thread took over while this was asleep) + // + // Initialize my_timed_waiter_generation to some value that is NOT equal to + // g_timed_waiter_generation + uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; + + /* If there's no timed waiter, we should become one: that waiter waits only + until the next timer should expire. All other timer threads wait forever + unless their 'next' is earlier than the current timed-waiter's deadline + (in which case the thread with earlier 'next' takes over as the new timed + waiter) */ + if (gpr_time_cmp(next, inf_future) != 0) { + if (!g_has_timed_waiter || + (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) { + my_timed_waiter_generation = ++g_timed_waiter_generation; + g_has_timed_waiter = true; + g_timed_waiter_deadline = next; + + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_timespec wait_time = + gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", + wait_time.tv_sec, wait_time.tv_nsec); + } + } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline + next = inf_future; + } } - } else { - next = inf_future; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + + if (GRPC_TRACER_ON(grpc_timer_check_trace) && + gpr_time_cmp(next, inf_future) == 0) { gpr_log(GPR_DEBUG, "sleep until kicked"); } + + gpr_cv_wait(&g_cv_wait, &g_mu, next); + + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", + my_timed_waiter_generation == g_timed_waiter_generation, + g_kicked); + } + // if this was the timed waiter, then we need to check timers, and flag + // that there's now no timed waiter... we'll look for a replacement if + // there's work to do after checking timers (code above) + if (my_timed_waiter_generation == g_timed_waiter_generation) { + g_has_timed_waiter = false; + g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + } } - gpr_cv_wait(&g_cv_wait, &g_mu, next); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", - my_timed_waiter_generation == g_timed_waiter_generation, g_kicked); - } - // if this was the timed waiter, then we need to check timers, and flag - // that there's now no timed waiter... we'll look for a replacement if - // there's work to do after checking timers (code above) - if (my_timed_waiter_generation == g_timed_waiter_generation) { - g_has_timed_waiter = false; - } + // if this was a kick from the timer system, consume it (and don't stop // this thread yet) if (g_kicked) { grpc_timer_consume_kick(); g_kicked = false; } + gpr_mu_unlock(&g_mu); return true; } @@ -257,6 +294,9 @@ void grpc_timer_manager_init(void) { g_waiter_count = 0; g_completed_threads = NULL; + g_has_timed_waiter = false; + g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + start_threads(); } @@ -302,6 +342,7 @@ void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; + g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 8c747085bb..6cd558d123 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -462,6 +462,35 @@ static BIGNUM *bignum_from_base64(grpc_exec_ctx *exec_ctx, const char *b64) { return result; } +#if OPENSSL_VERSION_NUMBER < 0x10100000L + +// Provide compatibility across OpenSSL 1.02 and 1.1. +static int RSA_set0_key(RSA *r, BIGNUM *n, BIGNUM *e, BIGNUM *d) { + /* If the fields n and e in r are NULL, the corresponding input + * parameters MUST be non-NULL for n and e. d may be + * left NULL (in case only the public key is used). + */ + if ((r->n == NULL && n == NULL) || (r->e == NULL && e == NULL)) { + return 0; + } + + if (n != NULL) { + BN_free(r->n); + r->n = n; + } + if (e != NULL) { + BN_free(r->e); + r->e = e; + } + if (d != NULL) { + BN_free(r->d); + r->d = d; + } + + return 1; +} +#endif // OPENSSL_VERSION_NUMBER < 0x10100000L + static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json, const char *kty) { const grpc_json *key_prop; @@ -478,21 +507,27 @@ static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json, gpr_log(GPR_ERROR, "Could not create rsa key."); goto end; } + BIGNUM *tmp_n = NULL; + BIGNUM *tmp_e = NULL; for (key_prop = json->child; key_prop != NULL; key_prop = key_prop->next) { if (strcmp(key_prop->key, "n") == 0) { - rsa->n = + tmp_n = bignum_from_base64(exec_ctx, validate_string_field(key_prop, "n")); - if (rsa->n == NULL) goto end; + if (tmp_n == NULL) goto end; } else if (strcmp(key_prop->key, "e") == 0) { - rsa->e = + tmp_e = bignum_from_base64(exec_ctx, validate_string_field(key_prop, "e")); - if (rsa->e == NULL) goto end; + if (tmp_e == NULL) goto end; } } - if (rsa->e == NULL || rsa->n == NULL) { + if (tmp_e == NULL || tmp_n == NULL) { gpr_log(GPR_ERROR, "Missing RSA public key field."); goto end; } + if (!RSA_set0_key(rsa, tmp_n, tmp_e, NULL)) { + gpr_log(GPR_ERROR, "Cannot set RSA key from inputs."); + goto end; + } result = EVP_PKEY_new(); EVP_PKEY_set1_RSA(result, rsa); /* uprefs rsa. */ diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 58112b04b4..50a51b31cd 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -49,7 +49,6 @@ typedef struct { pollset_set so that work can progress when this call wants work to progress */ grpc_polling_entity *pollent; - grpc_transport_stream_op_batch op; gpr_atm security_context_set; gpr_mu security_context_mu; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; @@ -92,11 +91,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, size_t num_md, grpc_credentials_status status, const char *error_details) { - grpc_call_element *elem = (grpc_call_element *)user_data; + grpc_transport_stream_op_batch *batch = + (grpc_transport_stream_op_batch *)user_data; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; - grpc_transport_stream_op_batch *op = &calld->op; - grpc_metadata_batch *mdb; - size_t i; reset_auth_metadata_context(&calld->auth_md_context); grpc_error *error = GRPC_ERROR_NONE; if (status != GRPC_CREDENTIALS_OK) { @@ -108,9 +106,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); } else { GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); - GPR_ASSERT(op->send_initial_metadata); - mdb = op->payload->send_initial_metadata.send_initial_metadata; - for (i = 0; i < num_md; i++) { + GPR_ASSERT(batch->send_initial_metadata); + grpc_metadata_batch *mdb = + batch->payload->send_initial_metadata.send_initial_metadata; + for (size_t i = 0; i < num_md; i++) { add_error(&error, grpc_metadata_batch_add_tail( exec_ctx, mdb, &calld->md_links[i], @@ -120,9 +119,9 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, } } if (error == GRPC_ERROR_NONE) { - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } else { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error); } } @@ -158,11 +157,11 @@ void build_auth_metadata_context(grpc_security_connector *sc, static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = - (grpc_client_security_context *)op->payload + (grpc_client_security_context *)batch->payload ->context[GRPC_CONTEXT_SECURITY] .value; grpc_call_credentials *channel_call_creds = @@ -171,7 +170,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, if (channel_call_creds == NULL && !call_creds_has_md) { /* Skip sending metadata altogether. */ - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); return; } @@ -180,7 +179,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, ctx->creds, NULL); if (calld->creds == NULL) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, + exec_ctx, batch, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Incompatible credentials set on channel and call."), @@ -194,28 +193,29 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, build_auth_metadata_context(&chand->security_connector->base, chand->auth_context, calld); - calld->op = *op; /* Copy op (originates from the caller's stack). */ GPR_ASSERT(calld->pollent != NULL); grpc_call_credentials_get_request_metadata( exec_ctx, calld->creds, calld->pollent, calld->auth_md_context, - on_credentials_metadata, elem); + on_credentials_metadata, batch); } static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, grpc_security_status status) { - grpc_call_element *elem = (grpc_call_element *)user_data; + grpc_transport_stream_op_batch *batch = + (grpc_transport_stream_op_batch *)user_data; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; if (status == GRPC_SECURITY_OK) { - send_security_metadata(exec_ctx, elem, &calld->op); + send_security_metadata(exec_ctx, elem, batch); } else { char *error_msg; char *host = grpc_slice_to_c_string(calld->host); gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", host); gpr_free(host); - grpc_call_element_signal_error( - exec_ctx, elem, + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, batch, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED)); @@ -223,35 +223,29 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, } } -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GPR_TIMER_BEGIN("auth_start_transport_op", 0); +static void auth_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + GPR_TIMER_BEGIN("auth_start_transport_stream_op_batch", 0); /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_linked_mdelem *l; - grpc_client_security_context *sec_ctx = NULL; - if (!op->cancel_stream) { + if (!batch->cancel_stream) { /* double checked lock over security context to ensure it's set once */ if (gpr_atm_acq_load(&calld->security_context_set) == 0) { gpr_mu_lock(&calld->security_context_mu); if (gpr_atm_acq_load(&calld->security_context_set) == 0) { - GPR_ASSERT(op->payload->context != NULL); - if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { - op->payload->context[GRPC_CONTEXT_SECURITY].value = + GPR_ASSERT(batch->payload->context != NULL); + if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + batch->payload->context[GRPC_CONTEXT_SECURITY].value = grpc_client_security_context_create(); - op->payload->context[GRPC_CONTEXT_SECURITY].destroy = + batch->payload->context[GRPC_CONTEXT_SECURITY].destroy = grpc_client_security_context_destroy; } - sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value; + grpc_client_security_context *sec_ctx = + batch->payload->context[GRPC_CONTEXT_SECURITY].value; GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); sec_ctx->auth_context = GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); @@ -261,9 +255,9 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, } } - if (op->send_initial_metadata) { - for (l = op->payload->send_initial_metadata.send_initial_metadata->list - .head; + if (batch->send_initial_metadata) { + for (grpc_linked_mdelem *l = batch->payload->send_initial_metadata + .send_initial_metadata->list.head; l != NULL; l = l->next) { grpc_mdelem md = l->md; /* Pointer comparison is OK for md_elems created from the same context. @@ -284,19 +278,19 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, } if (calld->have_host) { char *call_host = grpc_slice_to_c_string(calld->host); - calld->op = *op; /* Copy op (originates from the caller's stack). */ + batch->handler_private.extra_arg = elem; grpc_channel_security_connector_check_call_host( exec_ctx, chand->security_connector, call_host, chand->auth_context, - on_host_checked, elem); + on_host_checked, batch); gpr_free(call_host); - GPR_TIMER_END("auth_start_transport_op", 0); + GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); return; /* early exit */ } } /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); - GPR_TIMER_END("auth_start_transport_op", 0); + grpc_call_next_op(exec_ctx, elem, batch); + GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ @@ -379,7 +373,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_client_auth_filter = { - auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, set_pollset_or_pollset_set, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"}; + auth_start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + set_pollset_or_pollset_set, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "client-auth"}; diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 4e6914be7b..9bf3f0ca0f 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -27,14 +27,9 @@ #include "src/core/lib/slice/slice_internal.h" typedef struct call_data { - grpc_metadata_batch *recv_initial_metadata; - /* Closure to call when finished with the auth_on_recv hook. */ - grpc_closure *on_done_recv; - /* Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member after - handling it. */ - grpc_closure auth_on_recv; - grpc_transport_stream_op_batch *transport_op; + grpc_transport_stream_op_batch *recv_initial_metadata_batch; + grpc_closure *original_recv_initial_metadata_ready; + grpc_closure recv_initial_metadata_ready; grpc_metadata_array md; const grpc_metadata *consumed_md; size_t num_consumed_md; @@ -90,125 +85,96 @@ static void on_md_processing_done( grpc_status_code status, const char *error_details) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; + grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - /* TODO(jboeuf): Implement support for response_md. */ if (response_md != NULL && num_response_md > 0) { gpr_log(GPR_INFO, "response_md in auth metadata processing not supported for now. " "Ignoring..."); } - + grpc_error *error = GRPC_ERROR_NONE; if (status == GRPC_STATUS_OK) { calld->consumed_md = consumed_md; calld->num_consumed_md = num_consumed_md; - /* TODO(ctiller): propagate error */ - GRPC_LOG_IF_ERROR( - "grpc_metadata_batch_filter", - grpc_metadata_batch_filter(&exec_ctx, calld->recv_initial_metadata, - remove_consumed_md, elem, - "Response metadata filtering error")); - for (size_t i = 0; i < calld->md.count; i++) { - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); - } - grpc_metadata_array_destroy(&calld->md); - GRPC_CLOSURE_SCHED(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE); + error = grpc_metadata_batch_filter( + &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, + remove_consumed_md, elem, "Response metadata filtering error"); } else { - for (size_t i = 0; i < calld->md.count; i++) { - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); - } - grpc_metadata_array_destroy(&calld->md); - error_details = error_details != NULL - ? error_details - : "Authentication metadata processing failed."; - if (calld->transport_op->send_message) { - grpc_byte_stream_destroy( - &exec_ctx, calld->transport_op->payload->send_message.send_message); - calld->transport_op->payload->send_message.send_message = NULL; + if (error_details == NULL) { + error_details = "Authentication metadata processing failed."; } - GRPC_CLOSURE_SCHED( - &exec_ctx, calld->on_done_recv, + error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status)); + GRPC_ERROR_INT_GRPC_STATUS, status); } - + for (size_t i = 0; i < calld->md.count; i++) { + grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); + grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); + } + grpc_metadata_array_destroy(&calld->md); + GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready, + error); grpc_exec_ctx_finish(&exec_ctx); } -static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *error) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; if (error == GRPC_ERROR_NONE) { if (chand->creds != NULL && chand->creds->processor.process != NULL) { - calld->md = metadata_batch_to_md_array(calld->recv_initial_metadata); + calld->md = metadata_batch_to_md_array( + batch->payload->recv_initial_metadata.recv_initial_metadata); chand->creds->processor.process( chand->creds->processor.state, calld->auth_context, calld->md.metadata, calld->md.count, on_md_processing_done, elem); return; } } - GRPC_CLOSURE_SCHED(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } -static void set_recv_ops_md_callbacks(grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { +static void auth_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; - - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->on_done_recv = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->auth_on_recv; - calld->transport_op = op; + if (batch->recv_initial_metadata) { + // Inject our callback. + calld->recv_initial_metadata_batch = batch; + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; } -} - -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - set_recv_ops_md_callbacks(elem, op); - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_server_security_context *server_ctx = NULL; - - /* initialize members */ - memset(calld, 0, sizeof(*calld)); - GRPC_CLOSURE_INIT(&calld->auth_on_recv, auth_on_recv, elem, + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - + // Create server security context. Set its auth context from channel + // data and save it in the call context. + grpc_server_security_context *server_ctx = + grpc_server_security_context_create(); + server_ctx->auth_context = grpc_auth_context_create(chand->auth_context); + calld->auth_context = server_ctx->auth_context; if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) { args->context[GRPC_CONTEXT_SECURITY].destroy( args->context[GRPC_CONTEXT_SECURITY].value); } - - server_ctx = grpc_server_security_context_create(); - server_ctx->auth_context = grpc_auth_context_create(chand->auth_context); - calld->auth_context = server_ctx->auth_context; - args->context[GRPC_CONTEXT_SECURITY].value = server_ctx; args->context[GRPC_CONTEXT_SECURITY].destroy = grpc_server_security_context_destroy; - return GRPC_ERROR_NONE; } @@ -221,19 +187,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { + GPR_ASSERT(!args->is_last); + channel_data *chand = elem->channel_data; grpc_auth_context *auth_context = grpc_find_auth_context_in_args(args->channel_args); - grpc_server_credentials *creds = - grpc_find_server_credentials_in_args(args->channel_args); - /* grab pointers to our data from the channel element */ - channel_data *chand = elem->channel_data; - - GPR_ASSERT(!args->is_last); GPR_ASSERT(auth_context != NULL); - - /* initialize members */ chand->auth_context = GRPC_AUTH_CONTEXT_REF(auth_context, "server_auth_filter"); + grpc_server_credentials *creds = + grpc_find_server_credentials_in_args(args->channel_args); chand->creds = grpc_server_credentials_ref(creds); return GRPC_ERROR_NONE; } @@ -241,14 +203,13 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { - /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "server_auth_filter"); grpc_server_credentials_unref(exec_ctx, chand->creds); } const grpc_channel_filter grpc_server_auth_filter = { - auth_start_transport_op, + auth_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c index 58c4c435d3..e9f893988d 100644 --- a/src/core/lib/support/mpscq.c +++ b/src/core/lib/support/mpscq.c @@ -31,12 +31,11 @@ void gpr_mpscq_destroy(gpr_mpscq *q) { GPR_ASSERT(q->tail == &q->stub); } -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_mpscq_node *prev = (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n); - return prev == &q->stub; } gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { @@ -78,25 +77,3 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) { *empty = false; return NULL; } - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q) { - gpr_mpscq_init(&q->queue); - q->read_lock = GPR_SPINLOCK_INITIALIZER; -} - -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) { - gpr_mpscq_destroy(&q->queue); -} - -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) { - return gpr_mpscq_push(&q->queue, n); -} - -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) { - if (gpr_spinlock_trylock(&q->read_lock)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue); - gpr_spinlock_unlock(&q->read_lock); - return n; - } - return NULL; -} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 2f4739d7f8..daa51768f7 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -22,7 +22,6 @@ #include <grpc/support/atm.h> #include <stdbool.h> #include <stddef.h> -#include "src/core/lib/support/spinlock.h" // Multiple-producer single-consumer lock free queue, based upon the // implementation from Dmitry Vyukov here: @@ -44,34 +43,11 @@ typedef struct gpr_mpscq { void gpr_mpscq_init(gpr_mpscq *q); void gpr_mpscq_destroy(gpr_mpscq *q); // Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); // Pop a node (returns NULL if no node is ready - which doesn't indicate that // the queue is empty!!) -// Thread compatible - can only be called from one thread at a time gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); // Pop a node; sets *empty to true if the queue is empty, or false if it is not gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty); -// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing -// only one thread will succeed concurrently -typedef struct gpr_locked_mpscq { - gpr_mpscq queue; - gpr_spinlock read_lock; -} gpr_locked_mpscq; - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q); -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q); -// Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n); -// Pop a node (returns NULL if no node is ready - which doesn't indicate that -// the queue is empty!!) -// Thread safe - can be called from multiple threads concurrently -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q); - #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c new file mode 100644 index 0000000000..0fb64ed001 --- /dev/null +++ b/src/core/lib/support/stack_lockfree.c @@ -0,0 +1,137 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/support/stack_lockfree.h" + +#include <stdlib.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +/* The lockfree node structure is a single architecture-level + word that allows for an atomic CAS to set it up. */ +struct lockfree_node_contents { + /* next thing to look at. Actual index for head, next index otherwise */ + uint16_t index; +#ifdef GPR_ARCH_64 + uint16_t pad; + uint32_t aba_ctr; +#else +#ifdef GPR_ARCH_32 + uint16_t aba_ctr; +#else +#error Unsupported bit width architecture +#endif +#endif +}; + +/* Use a union to make sure that these are in the same bits as an atm word */ +typedef union lockfree_node { + gpr_atm atm; + struct lockfree_node_contents contents; +} lockfree_node; + +/* make sure that entries aligned to 8-bytes */ +#define ENTRY_ALIGNMENT_BITS 3 +/* reserve this entry as invalid */ +#define INVALID_ENTRY_INDEX ((1 << 16) - 1) + +struct gpr_stack_lockfree { + lockfree_node *entries; + lockfree_node head; /* An atomic entry describing curr head */ +}; + +gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) { + gpr_stack_lockfree *stack; + stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack)); + /* Since we only allocate 16 bits to represent an entry number, + * make sure that we are within the desired range */ + /* Reserve the highest entry number as a dummy */ + GPR_ASSERT(entries < INVALID_ENTRY_INDEX); + stack->entries = (lockfree_node *)gpr_malloc_aligned( + entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS); + /* Clear out all entries */ + memset(stack->entries, 0, entries * sizeof(stack->entries[0])); + memset(&stack->head, 0, sizeof(stack->head)); + + GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents)); + + /* Point the head at reserved dummy entry */ + stack->head.contents.index = INVALID_ENTRY_INDEX; +/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */ +#ifdef GPR_ARCH_64 + stack->head.contents.pad = 0; +#endif + stack->head.contents.aba_ctr = 0; + return stack; +} + +void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) { + gpr_free_aligned(stack->entries); + gpr_free(stack); +} + +int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { + lockfree_node head; + lockfree_node newhead; + lockfree_node curent; + lockfree_node newent; + + /* First fill in the entry's index and aba ctr for new head */ + newhead.contents.index = (uint16_t)entry; +#ifdef GPR_ARCH_64 + /* Fill in the pad to avoid confusing memcheck tools */ + newhead.contents.pad = 0; +#endif + + /* Also post-increment the aba_ctr */ + curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newhead.contents.aba_ctr = ++curent.contents.aba_ctr; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm); + + do { + /* Atomically get the existing head value for use */ + head.atm = gpr_atm_no_barrier_load(&(stack->head.atm)); + /* Point to it */ + newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newent.contents.index = head.contents.index; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm); + } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); + /* Use rel_cas above to make sure that entry index is set properly */ + return head.contents.index == INVALID_ENTRY_INDEX; +} + +int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { + lockfree_node head; + lockfree_node newhead; + + do { + head.atm = gpr_atm_acq_load(&(stack->head.atm)); + if (head.contents.index == INVALID_ENTRY_INDEX) { + return -1; + } + newhead.atm = + gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm)); + + } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm)); + + return head.contents.index; +} diff --git a/src/core/lib/support/stack_lockfree.h b/src/core/lib/support/stack_lockfree.h new file mode 100644 index 0000000000..6324211b72 --- /dev/null +++ b/src/core/lib/support/stack_lockfree.h @@ -0,0 +1,38 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H +#define GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H + +#include <stddef.h> + +typedef struct gpr_stack_lockfree gpr_stack_lockfree; + +/* This stack must specify the maximum number of entries to track. + The current implementation only allows up to 65534 entries */ +gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries); +void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack); + +/* Pass in a valid entry number for the next stack entry */ +/* Returns 1 if this is the first element on the stack, 0 otherwise */ +int gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry); + +/* Returns -1 on empty or the actual entry number */ +int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack); + +#endif /* GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 84ddf74ab9..0cd436883a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -32,8 +32,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/mpscq.h" -#include "src/core/lib/support/spinlock.h" +#include "src/core/lib/support/stack_lockfree.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -62,7 +61,6 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); typedef struct requested_call { - gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void *tag; @@ -162,7 +160,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_locked_mpscq *requests_per_cq; + gpr_stack_lockfree **requests_per_cq; }; struct registered_method { @@ -207,6 +205,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; + /** requested call backing data */ + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -306,20 +309,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, grpc_server *server) { +static void request_matcher_init(request_matcher *rm, size_t entries, + grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; rm->requests_per_cq = gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&rm->requests_per_cq[i]); + rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); } } static void request_matcher_destroy(request_matcher *rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); - gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); + GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); + gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -349,17 +353,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm, grpc_error *error) { - requested_call *rc; + int request_id; for (size_t i = 0; i < server->cq_count; i++) { - /* Here we know: - 1. no requests are being added (since the server is shut down) - 2. no other threads are pulling (since the shut down process is single - threaded) - So, we can ignore the queue lock and just pop, with the guarantee that a - NULL returned here truly means that the queue is empty */ - while ((rc = (requested_call *)gpr_mpscq_pop( - &rm->requests_per_cq[i].queue)) != NULL) { - fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); + while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != + -1) { + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], + GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -394,7 +394,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -452,7 +458,21 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); + } else { + gpr_free(req); + } + + server_unref(exec_ctx, server); } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -482,6 +502,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_UNREACHABLE_CODE(return ); } + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + channel_data *chand = elem->channel_data; + server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -509,15 +533,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - requested_call *rc = - (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) { + int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) { continue; } else { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -992,6 +1016,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1064,15 +1090,29 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, server); + request_matcher_init(&server->unregistered_request_matcher, + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } server_ref(server); @@ -1326,11 +1366,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; + int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); + if (request_id == -1) { + /* out of request ids: just fail this one */ + fail_call(exec_ctx, server, cq_idx, rc, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), + GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); + return GRPC_CALL_OK; + } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1339,13 +1389,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { + server->requested_calls_per_cq[cq_idx][request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) break; + request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1361,7 +1413,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1468,6 +1521,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); + server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c index 404c240589..2388f19f81 100644 --- a/src/core/lib/transport/static_metadata.c +++ b/src/core/lib/transport/static_metadata.c @@ -464,7 +464,8 @@ grpc_mdelem grpc_static_mdelem_for_static_strings(int a, int b) { if (a == -1 || b == -1) return GRPC_MDNULL; uint32_t k = (uint32_t)(a * 99 + b); uint32_t h = elems_phash(k); - return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k + return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k && + elem_idxs[h] != 255 ? GRPC_MAKE_MDELEM(&grpc_static_mdelem_table[elem_idxs[h]], GRPC_MDELEM_STORAGE_STATIC) : GRPC_MDNULL; |