diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/iomgr/endpoint.cc | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/endpoint.h | 11 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.cc | 21 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_uv.cc | 23 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_windows.cc | 19 | ||||
-rw-r--r-- | src/core/lib/security/credentials/ssl/ssl_credentials.cc | 60 | ||||
-rw-r--r-- | src/core/lib/security/credentials/ssl/ssl_credentials.h | 15 | ||||
-rw-r--r-- | src/core/lib/security/transport/secure_endpoint.cc | 8 | ||||
-rw-r--r-- | src/core/lib/security/transport/security_connector.cc | 7 | ||||
-rw-r--r-- | src/core/lib/security/transport/security_connector.h | 2 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.cc | 116 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.h | 3 | ||||
-rw-r--r-- | src/core/lib/surface/init.cc | 1 |
13 files changed, 217 insertions, 75 deletions
diff --git a/src/core/lib/iomgr/endpoint.cc b/src/core/lib/iomgr/endpoint.cc index 37cce335ca..5eab1d3158 100644 --- a/src/core/lib/iomgr/endpoint.cc +++ b/src/core/lib/iomgr/endpoint.cc @@ -39,6 +39,12 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx, ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set); } +void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_endpoint* ep, + grpc_pollset_set* pollset_set) { + ep->vtable->delete_from_pollset_set(exec_ctx, ep, pollset_set); +} + void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep, grpc_error* why) { ep->vtable->shutdown(exec_ctx, ep, why); diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index 21347d9023..92964e0f2d 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -45,6 +45,8 @@ struct grpc_endpoint_vtable { grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset); + void (*delete_from_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset_set *pollset); void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why); void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep); @@ -85,14 +87,19 @@ void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why); void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep); -/* Add an endpoint to a pollset, so that when the pollset is polled, events from - this endpoint are considered */ +/* Add an endpoint to a pollset or pollset_set, so that when the pollset is + polled, events from this endpoint are considered */ void grpc_endpoint_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set *pollset_set); +/* Delete an endpoint from a pollset_set */ +void grpc_endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset_set); + grpc_resource_user *grpc_endpoint_get_resource_user(grpc_endpoint *endpoint); struct grpc_endpoint { diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index dbcc976ae9..b7c1803ded 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -704,6 +704,13 @@ static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd); } +static void tcp_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset_set) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_pollset_set_del_fd(exec_ctx, pollset_set, tcp->em_fd); +} + static char *tcp_get_peer(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; return gpr_strdup(tcp->peer_string); @@ -719,10 +726,16 @@ static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { return tcp->resource_user; } -static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, - tcp_shutdown, tcp_destroy, tcp_get_resource_user, tcp_get_peer, - tcp_get_fd}; +static const grpc_endpoint_vtable vtable = {tcp_read, + tcp_write, + tcp_add_to_pollset, + tcp_add_to_pollset_set, + tcp_delete_from_pollset_set, + tcp_shutdown, + tcp_destroy, + tcp_get_resource_user, + tcp_get_peer, + tcp_get_fd}; #define MAX_CHUNK_SIZE 32 * 1024 * 1024 diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index e311964dbc..99b9f1ea2d 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -304,6 +304,15 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, (void)pollset; } +static void uv_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pollset) { + // No-op. We're ignoring pollsets currently + (void)exec_ctx; + (void)ep; + (void)pollset; +} + static void shutdown_callback(uv_shutdown_t *req, int status) {} static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -340,10 +349,16 @@ static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { static int uv_get_fd(grpc_endpoint *ep) { return -1; } -static grpc_endpoint_vtable vtable = { - uv_endpoint_read, uv_endpoint_write, uv_add_to_pollset, - uv_add_to_pollset_set, uv_endpoint_shutdown, uv_destroy, - uv_get_resource_user, uv_get_peer, uv_get_fd}; +static grpc_endpoint_vtable vtable = {uv_endpoint_read, + uv_endpoint_write, + uv_add_to_pollset, + uv_add_to_pollset_set, + uv_delete_from_pollset_set, + uv_endpoint_shutdown, + uv_destroy, + uv_get_resource_user, + uv_get_peer, + uv_get_fd}; grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, diff --git a/src/core/lib/iomgr/tcp_windows.cc b/src/core/lib/iomgr/tcp_windows.cc index dc84e564a9..6efcff84b8 100644 --- a/src/core/lib/iomgr/tcp_windows.cc +++ b/src/core/lib/iomgr/tcp_windows.cc @@ -371,6 +371,10 @@ static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_iocp_add_socket(tcp->socket); } +static void win_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *ep, + grpc_pollset_set *pss) {} + /* Initiates a shutdown of the TCP endpoint. This will queue abort callbacks for the potential read and write operations. It is up to the caller to guarantee this isn't called in parallel to a read or write request, so @@ -412,10 +416,16 @@ static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { static int win_get_fd(grpc_endpoint *ep) { return -1; } -static grpc_endpoint_vtable vtable = { - win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_resource_user, win_get_peer, - win_get_fd}; +static grpc_endpoint_vtable vtable = {win_read, + win_write, + win_add_to_pollset, + win_add_to_pollset_set, + win_delete_from_pollset_set, + win_shutdown, + win_destroy, + win_get_resource_user, + win_get_peer, + win_get_fd}; grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, grpc_channel_args *channel_args, @@ -442,6 +452,7 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); + grpc_resource_quota_unref_internal(exec_ctx, resource_quota); return &tcp->base; } diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc index 290336adc0..8e47aebedb 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc @@ -31,18 +31,21 @@ // SSL Channel Credentials. // -static void ssl_config_pem_key_cert_pair_destroy( - tsi_ssl_pem_key_cert_pair *kp) { +void grpc_tsi_ssl_pem_key_cert_pairs_destroy(tsi_ssl_pem_key_cert_pair *kp, + size_t num_key_cert_pairs) { if (kp == NULL) return; - gpr_free((void *)kp->private_key); - gpr_free((void *)kp->cert_chain); + for (size_t i = 0; i < num_key_cert_pairs; i++) { + gpr_free((void *)kp[i].private_key); + gpr_free((void *)kp[i].cert_chain); + } + gpr_free(kp); } static void ssl_destruct(grpc_exec_ctx *exec_ctx, grpc_channel_credentials *creds) { grpc_ssl_credentials *c = (grpc_ssl_credentials *)creds; gpr_free(c->config.pem_root_certs); - ssl_config_pem_key_cert_pair_destroy(&c->config.pem_key_cert_pair); + grpc_tsi_ssl_pem_key_cert_pairs_destroy(c->config.pem_key_cert_pair, 1); } static grpc_security_status ssl_create_security_connector( @@ -85,9 +88,11 @@ static void ssl_build_config(const char *pem_root_certs, if (pem_key_cert_pair != NULL) { GPR_ASSERT(pem_key_cert_pair->private_key != NULL); GPR_ASSERT(pem_key_cert_pair->cert_chain != NULL); - config->pem_key_cert_pair.cert_chain = + config->pem_key_cert_pair = (tsi_ssl_pem_key_cert_pair *)gpr_zalloc( + sizeof(tsi_ssl_pem_key_cert_pair)); + config->pem_key_cert_pair->cert_chain = gpr_strdup(pem_key_cert_pair->cert_chain); - config->pem_key_cert_pair.private_key = + config->pem_key_cert_pair->private_key = gpr_strdup(pem_key_cert_pair->private_key); } } @@ -117,11 +122,8 @@ grpc_channel_credentials *grpc_ssl_credentials_create( static void ssl_server_destruct(grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds) { grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; - size_t i; - for (i = 0; i < c->config.num_key_cert_pairs; i++) { - ssl_config_pem_key_cert_pair_destroy(&c->config.pem_key_cert_pairs[i]); - } - gpr_free(c->config.pem_key_cert_pairs); + grpc_tsi_ssl_pem_key_cert_pairs_destroy(c->config.pem_key_cert_pairs, + c->config.num_key_cert_pairs); gpr_free(c->config.pem_root_certs); } @@ -136,30 +138,36 @@ static grpc_security_status ssl_server_create_security_connector( static grpc_server_credentials_vtable ssl_server_vtable = { ssl_server_destruct, ssl_server_create_security_connector}; +tsi_ssl_pem_key_cert_pair *grpc_convert_grpc_to_tsi_cert_pairs( + const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs) { + tsi_ssl_pem_key_cert_pair *tsi_pairs = NULL; + if (num_key_cert_pairs > 0) { + GPR_ASSERT(pem_key_cert_pairs != NULL); + tsi_pairs = (tsi_ssl_pem_key_cert_pair *)gpr_zalloc( + num_key_cert_pairs * sizeof(tsi_ssl_pem_key_cert_pair)); + } + for (size_t i = 0; i < num_key_cert_pairs; i++) { + GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL); + GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL); + tsi_pairs[i].cert_chain = gpr_strdup(pem_key_cert_pairs[i].cert_chain); + tsi_pairs[i].private_key = gpr_strdup(pem_key_cert_pairs[i].private_key); + } + return tsi_pairs; +} + static void ssl_build_server_config( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, grpc_ssl_client_certificate_request_type client_certificate_request, grpc_ssl_server_config *config) { - size_t i; config->client_certificate_request = client_certificate_request; if (pem_root_certs != NULL) { config->pem_root_certs = gpr_strdup(pem_root_certs); } - if (num_key_cert_pairs > 0) { - GPR_ASSERT(pem_key_cert_pairs != NULL); - config->pem_key_cert_pairs = (tsi_ssl_pem_key_cert_pair *)gpr_zalloc( - num_key_cert_pairs * sizeof(tsi_ssl_pem_key_cert_pair)); - } + config->pem_key_cert_pairs = grpc_convert_grpc_to_tsi_cert_pairs( + pem_key_cert_pairs, num_key_cert_pairs); config->num_key_cert_pairs = num_key_cert_pairs; - for (i = 0; i < num_key_cert_pairs; i++) { - GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL); - GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL); - config->pem_key_cert_pairs[i].cert_chain = - gpr_strdup(pem_key_cert_pairs[i].cert_chain); - config->pem_key_cert_pairs[i].private_key = - gpr_strdup(pem_key_cert_pairs[i].private_key); - } } grpc_server_credentials *grpc_ssl_server_credentials_create( diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.h b/src/core/lib/security/credentials/ssl/ssl_credentials.h index b43c656cd7..42e425d9f1 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.h +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.h @@ -20,6 +20,10 @@ #include "src/core/lib/security/credentials/credentials.h" +#ifdef __cplusplus +extern "C" { +#endif + typedef struct { grpc_channel_credentials base; grpc_ssl_config config; @@ -30,4 +34,15 @@ typedef struct { grpc_ssl_server_config config; } grpc_ssl_server_credentials; +tsi_ssl_pem_key_cert_pair *grpc_convert_grpc_to_tsi_cert_pairs( + const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs); + +void grpc_tsi_ssl_pem_key_cert_pairs_destroy(tsi_ssl_pem_key_cert_pair *kp, + size_t num_key_cert_pairs); + +#ifdef __cplusplus +} +#endif + #endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_SSL_SSL_CREDENTIALS_H */ diff --git a/src/core/lib/security/transport/secure_endpoint.cc b/src/core/lib/security/transport/secure_endpoint.cc index ae5633b82c..859d04ae5a 100644 --- a/src/core/lib/security/transport/secure_endpoint.cc +++ b/src/core/lib/security/transport/secure_endpoint.cc @@ -379,6 +379,13 @@ static void endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint_add_to_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); } +static void endpoint_delete_from_pollset_set(grpc_exec_ctx *exec_ctx, + grpc_endpoint *secure_ep, + grpc_pollset_set *pollset_set) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + grpc_endpoint_delete_from_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set); +} + static char *endpoint_get_peer(grpc_endpoint *secure_ep) { secure_endpoint *ep = (secure_endpoint *)secure_ep; return grpc_endpoint_get_peer(ep->wrapped_ep); @@ -399,6 +406,7 @@ static const grpc_endpoint_vtable vtable = {endpoint_read, endpoint_write, endpoint_add_to_pollset, endpoint_add_to_pollset_set, + endpoint_delete_from_pollset_set, endpoint_shutdown, endpoint_destroy, endpoint_get_resource_user, diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc index 80d9a7b77f..b050be2129 100644 --- a/src/core/lib/security/transport/security_connector.cc +++ b/src/core/lib/security/transport/security_connector.cc @@ -942,10 +942,11 @@ grpc_security_status grpc_ssl_channel_security_connector_create( c->overridden_target_name = gpr_strdup(overridden_target_name); } - has_key_cert_pair = config->pem_key_cert_pair.private_key != NULL && - config->pem_key_cert_pair.cert_chain != NULL; + has_key_cert_pair = config->pem_key_cert_pair != NULL && + config->pem_key_cert_pair->private_key != NULL && + config->pem_key_cert_pair->cert_chain != NULL; result = tsi_create_ssl_client_handshaker_factory( - has_key_cert_pair ? &config->pem_key_cert_pair : NULL, pem_root_certs, + has_key_cert_pair ? config->pem_key_cert_pair : NULL, pem_root_certs, ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols, &c->client_handshaker_factory); if (result != TSI_OK) { diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index 216bb35e81..8287151f44 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -204,7 +204,7 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create( /* Config for ssl clients. */ typedef struct { - tsi_ssl_pem_key_cert_pair pem_key_cert_pair; + tsi_ssl_pem_key_cert_pair *pem_key_cert_pair; char *pem_root_certs; } grpc_ssl_config; diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index 21664f03c8..5009f786e6 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -28,6 +28,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/time.h> +#include <grpc/support/tls.h> #include "src/core/lib/debug/stats.h" #include "src/core/lib/iomgr/pollset.h" @@ -48,6 +49,14 @@ grpc_tracer_flag grpc_trace_cq_refcount = GRPC_TRACER_INITIALIZER(false, "cq_refcount"); #endif +// Specifies a cq thread local cache. +// The first event that occurs on a thread +// with a cq cache will go into that cache, and +// will only be returned on the thread that initialized the cache. +// NOTE: Only one event will ever be cached. +GPR_TLS_DECL(g_cached_event); +GPR_TLS_DECL(g_cached_cq); + typedef struct { grpc_pollset_worker **worker; void *tag; @@ -345,6 +354,46 @@ grpc_tracer_flag grpc_cq_event_timeout_trace = static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cq, grpc_error *error); +void grpc_cq_global_init() { + gpr_tls_init(&g_cached_event); + gpr_tls_init(&g_cached_cq); +} + +void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue *cq) { + if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == nullptr) { + gpr_tls_set(&g_cached_event, (intptr_t)0); + gpr_tls_set(&g_cached_cq, (intptr_t)cq); + } +} + +int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue *cq, + void **tag, int *ok) { + grpc_cq_completion *storage = + (grpc_cq_completion *)gpr_tls_get(&g_cached_event); + int ret = 0; + if (storage != NULL && + (grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq) { + *tag = storage->tag; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + storage->done(&exec_ctx, storage->done_arg, storage); + *ok = (storage->next & (uintptr_t)(1)) == 1; + ret = 1; + cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(&exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "shutting_down"); + } + grpc_exec_ctx_finish(&exec_ctx); + } + gpr_tls_set(&g_cached_event, (intptr_t)0); + gpr_tls_set(&g_cached_cq, (intptr_t)0); + + return ret; +} + static void cq_event_queue_init(grpc_cq_event_queue *q) { gpr_mpscq_init(&q->queue); q->queue_lock = GPR_SPINLOCK_INITIALIZER; @@ -617,7 +666,6 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg); } } - cq_next_data *cqd = (cq_next_data *)DATA_FROM_CQ(cq); int is_success = (error == GRPC_ERROR_NONE); @@ -628,44 +676,50 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, cq_check_tag(cq, tag, true); /* Used in debug builds only */ - /* Add the completion to the queue */ - bool is_first = cq_event_queue_push(&cqd->queue, storage); - gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - - /* Since we do not hold the cq lock here, it is important to do an 'acquire' - load here (instead of a 'no_barrier' load) to match with the release store - (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next - */ - bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; - - if (!will_definitely_shutdown) { - /* Only kick if this is the first item queued */ - if (is_first) { - gpr_mu_lock(cq->mu); - grpc_error *kick_error = - cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); - gpr_mu_unlock(cq->mu); + if ((grpc_completion_queue *)gpr_tls_get(&g_cached_cq) == cq && + (grpc_cq_completion *)gpr_tls_get(&g_cached_event) == nullptr) { + gpr_tls_set(&g_cached_event, (intptr_t)storage); + } else { + /* Add the completion to the queue */ + bool is_first = cq_event_queue_push(&cqd->queue, storage); + gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); + + /* Since we do not hold the cq lock here, it is important to do an 'acquire' + load here (instead of a 'no_barrier' load) to match with the release + store + (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next + */ + bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; + + if (!will_definitely_shutdown) { + /* Only kick if this is the first item queued */ + if (is_first) { + gpr_mu_lock(cq->mu); + grpc_error *kick_error = + cq->poller_vtable->kick(exec_ctx, POLLSET_FROM_CQ(cq), NULL); + gpr_mu_unlock(cq->mu); - if (kick_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(kick_error); - gpr_log(GPR_ERROR, "Kick failed: %s", msg); - GRPC_ERROR_UNREF(kick_error); + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + GRPC_ERROR_UNREF(kick_error); + } } - } - if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { + GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_mu_lock(cq->mu); + cq_finish_shutdown_next(exec_ctx, cq); + gpr_mu_unlock(cq->mu); + GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); + } + } else { GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); + gpr_atm_rel_store(&cqd->pending_events, 0); gpr_mu_lock(cq->mu); cq_finish_shutdown_next(exec_ctx, cq); gpr_mu_unlock(cq->mu); GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } - } else { - GRPC_CQ_INTERNAL_REF(cq, "shutting_down"); - gpr_atm_rel_store(&cqd->pending_events, 0); - gpr_mu_lock(cq->mu); - cq_finish_shutdown_next(exec_ctx, cq); - gpr_mu_unlock(cq->mu); - GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down"); } GPR_TIMER_END("cq_end_op_for_next", 0); diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 69d144bd95..c02bc5da07 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -70,6 +70,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc); #define GRPC_CQ_INTERNAL_UNREF(ec, cc, reason) grpc_cq_internal_unref(ec, cc) #endif +/* Initializes global variables used by completion queues */ +void grpc_cq_global_init(); + /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made. \a tag is currently used only in debug builds. Return true on success, and diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index b089da2c54..058e88f804 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -64,6 +64,7 @@ static void do_basic_init(void) { gpr_log_verbosity_init(); gpr_mu_init(&g_init_mu); grpc_register_built_in_plugins(); + grpc_cq_global_init(); g_initializations = 0; } |