diff options
Diffstat (limited to 'src/core')
29 files changed, 161 insertions, 57 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 3813190794..fa0c280f80 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -3242,7 +3242,7 @@ static void on_external_watch_complete_locked(void* arg, grpc_error* error) { "external_connectivity_watcher"); external_connectivity_watcher_list_remove(w->chand, w); gpr_free(w); - GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void watch_connectivity_state_locked(void* arg, diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 454e00a690..dab4466b21 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -162,9 +162,7 @@ class LoadBalancingPolicy GRPC_ABSTRACT_BASE_CLASS protected: - // So Delete() can access our protected dtor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE explicit LoadBalancingPolicy(const Args& args); virtual ~LoadBalancingPolicy(); diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h index 02380314dd..c7e37e4468 100644 --- a/src/core/ext/filters/client_channel/resolver.h +++ b/src/core/ext/filters/client_channel/resolver.h @@ -105,9 +105,7 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> { GRPC_ABSTRACT_BASE_CLASS protected: - // So Delete() can access our protected dtor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE /// Does NOT take ownership of the reference to \a combiner. // TODO(roth): Once we have a C++-like interface for combiners, this diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc index 140441da10..ad6b6dd192 100644 --- a/src/core/ext/filters/client_channel/subchannel.cc +++ b/src/core/ext/filters/client_channel/subchannel.cc @@ -408,7 +408,7 @@ static void on_external_state_watcher_done(void* arg, grpc_error* error) { gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher"); gpr_free(w); - GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error)); } static void on_alarm(void* arg, grpc_error* error) { diff --git a/src/core/ext/filters/http/client_authority_filter.cc b/src/core/ext/filters/http/client_authority_filter.cc index 1f57ab5ce6..63b9150aec 100644 --- a/src/core/ext/filters/http/client_authority_filter.cc +++ b/src/core/ext/filters/http/client_authority_filter.cc @@ -59,8 +59,9 @@ void authority_start_transport_stream_op_batch( initial_metadata->idx.named.authority == nullptr) { grpc_error* error = grpc_metadata_batch_add_head( initial_metadata, &calld->authority_storage, - grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY, - grpc_slice_ref(chand->default_authority))); + grpc_mdelem_from_slices( + GRPC_MDSTR_AUTHORITY, + grpc_slice_ref_internal(chand->default_authority))); if (error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure(batch, error, calld->call_combiner); @@ -110,7 +111,7 @@ grpc_error* init_channel_elem(grpc_channel_element* elem, /* Destructor for channel data */ void destroy_channel_elem(grpc_channel_element* elem) { channel_data* chand = static_cast<channel_data*>(elem->channel_data); - grpc_slice_unref(chand->default_authority); + grpc_slice_unref_internal(chand->default_authority); } } // namespace diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc index 7ff7cabfbd..cc4a823798 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc @@ -1684,16 +1684,16 @@ static void send_ping_locked(grpc_chttp2_transport* t, */ static void send_keepalive_ping_locked(grpc_chttp2_transport* t) { if (t->closed_with_error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); - GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked, - GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); + GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked, + GRPC_ERROR_REF(t->closed_with_error)); return; } grpc_chttp2_ping_queue* pq = &t->ping_queue; if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) { /* There is a ping in flight. Add yourself to the inflight closure list. */ - GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE); grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT], &t->finish_keepalive_ping_locked, GRPC_ERROR_NONE); return; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc index 0b88ff7afe..420c2d13e1 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.cc +++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc @@ -736,7 +736,7 @@ static void convert_metadata_to_cronet_headers( if (grpc_is_binary_header(GRPC_MDKEY(mdelem))) { grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem)); value = grpc_slice_to_c_string(wire_value); - grpc_slice_unref(wire_value); + grpc_slice_unref_internal(wire_value); } else { value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem)); } diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc index a9459b150d..ef6482cb7f 100644 --- a/src/core/lib/channel/channel_stack.cc +++ b/src/core/lib/channel/channel_stack.cc @@ -193,18 +193,13 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack, grpc_polling_entity* pollent) { size_t count = call_stack->count; grpc_call_element* call_elems; - char* user_data; size_t i; call_elems = CALL_ELEMS_FROM_STACK(call_stack); - user_data = (reinterpret_cast<char*>(call_elems)) + - ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); /* init per-filter data */ for (i = 0; i < count; i++) { call_elems[i].filter->set_pollset_or_pollset_set(&call_elems[i], pollent); - user_data += - ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data); } } diff --git a/src/core/lib/channel/channel_stack_builder.cc b/src/core/lib/channel/channel_stack_builder.cc index 8a72449034..df5a783631 100644 --- a/src/core/lib/channel/channel_stack_builder.cc +++ b/src/core/lib/channel/channel_stack_builder.cc @@ -25,9 +25,6 @@ #include <grpc/support/alloc.h> #include <grpc/support/string_util.h> -grpc_core::TraceFlag grpc_trace_channel_stack_builder(false, - "channel_stack_builder"); - typedef struct filter_node { struct filter_node* next; struct filter_node* prev; diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h index c9a170bc88..9196de9378 100644 --- a/src/core/lib/channel/channel_stack_builder.h +++ b/src/core/lib/channel/channel_stack_builder.h @@ -155,6 +155,4 @@ grpc_error* grpc_channel_stack_builder_finish( /// Destroy the builder without creating a channel stack void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder* builder); -extern grpc_core::TraceFlag grpc_trace_channel_stack_builder; - #endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H */ diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc index 2faeb64cb6..86f8699e04 100644 --- a/src/core/lib/channel/handshaker.cc +++ b/src/core/lib/channel/handshaker.cc @@ -28,6 +28,7 @@ #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/slice/slice_internal.h" grpc_core::TraceFlag grpc_handshaker_trace(false, "handshaker"); @@ -220,8 +221,26 @@ static bool call_next_handshaker_locked(grpc_handshake_manager* mgr, // callback. Otherwise, call the next handshaker. if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->args.exit_early || mgr->index == mgr->count) { + if (error == GRPC_ERROR_NONE && mgr->shutdown) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("handshaker shutdown"); + // TODO(roth): It is currently necessary to shutdown endpoints + // before destroying then, even when we know that there are no + // pending read/write callbacks. This should be fixed, at which + // point this can be removed. + grpc_endpoint_shutdown(mgr->args.endpoint, GRPC_ERROR_REF(error)); + grpc_endpoint_destroy(mgr->args.endpoint); + mgr->args.endpoint = nullptr; + grpc_channel_args_destroy(mgr->args.args); + mgr->args.args = nullptr; + grpc_slice_buffer_destroy_internal(mgr->args.read_buffer); + gpr_free(mgr->args.read_buffer); + mgr->args.read_buffer = nullptr; + } if (grpc_handshaker_trace.enabled()) { - gpr_log(GPR_INFO, "handshake_manager %p: handshaking complete", mgr); + gpr_log(GPR_INFO, + "handshake_manager %p: handshaking complete -- scheduling " + "on_handshake_done with error=%s", + mgr, grpc_error_string(error)); } // Cancel deadline timer, since we're invoking the on_handshake_done // callback now. diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc index 309ece94bb..f8c27db0a8 100644 --- a/src/core/lib/debug/stats_data.cc +++ b/src/core/lib/debug/stats_data.cc @@ -40,6 +40,8 @@ const char* grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = { "pollset_kick_wakeup_fd", "pollset_kick_wakeup_cv", "pollset_kick_own_thread", + "syscall_epoll_ctl", + "pollset_fd_cache_hits", "histogram_slow_lookups", "syscall_write", "syscall_read", @@ -144,6 +146,9 @@ const char* grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = { "polling wakeup (only valid for epoll1 right now)", "How many times could a polling wakeup be satisfied by keeping the waking " "thread awake? (only valid for epoll1 right now)", + "Number of epoll_ctl calls made (only valid for epollex right now)", + "Number of epoll_ctl calls skipped because the fd was cached as already " + "being added. (only valid for epollex right now)", "Number of times histogram increments went through the slow (binary " "search) path", "Number of write syscalls (or equivalent - eg sendmsg) made by this " diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h index 37c548095f..1f3861f494 100644 --- a/src/core/lib/debug/stats_data.h +++ b/src/core/lib/debug/stats_data.h @@ -41,6 +41,8 @@ typedef enum { GRPC_STATS_COUNTER_POLLSET_KICK_WAKEUP_FD, GRPC_STATS_COUNTER_POLLSET_KICK_WAKEUP_CV, GRPC_STATS_COUNTER_POLLSET_KICK_OWN_THREAD, + GRPC_STATS_COUNTER_SYSCALL_EPOLL_CTL, + GRPC_STATS_COUNTER_POLLSET_FD_CACHE_HITS, GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS, GRPC_STATS_COUNTER_SYSCALL_WRITE, GRPC_STATS_COUNTER_SYSCALL_READ, @@ -203,6 +205,10 @@ typedef enum { GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_POLLSET_KICK_WAKEUP_CV) #define GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD() \ GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_POLLSET_KICK_OWN_THREAD) +#define GRPC_STATS_INC_SYSCALL_EPOLL_CTL() \ + GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_SYSCALL_EPOLL_CTL) +#define GRPC_STATS_INC_POLLSET_FD_CACHE_HITS() \ + GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_POLLSET_FD_CACHE_HITS) #define GRPC_STATS_INC_HISTOGRAM_SLOW_LOOKUPS() \ GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS) #define GRPC_STATS_INC_SYSCALL_WRITE() \ @@ -443,6 +449,8 @@ void grpc_stats_inc_server_cqs_checked(int x); #define GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD() #define GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV() #define GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD() +#define GRPC_STATS_INC_SYSCALL_EPOLL_CTL() +#define GRPC_STATS_INC_POLLSET_FD_CACHE_HITS() #define GRPC_STATS_INC_HISTOGRAM_SLOW_LOOKUPS() #define GRPC_STATS_INC_SYSCALL_WRITE() #define GRPC_STATS_INC_SYSCALL_READ() diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml index af4553028e..775b09df74 100644 --- a/src/core/lib/debug/stats_data.yaml +++ b/src/core/lib/debug/stats_data.yaml @@ -63,6 +63,12 @@ doc: How many times could a polling wakeup be satisfied by keeping the waking thread awake? (only valid for epoll1 right now) +# polling +- counter: syscall_epoll_ctl + doc: Number of epoll_ctl calls made (only valid for epollex right now) +- counter: pollset_fd_cache_hits + doc: Number of epoll_ctl calls skipped because the fd was cached as + already being added. (only valid for epollex right now) # stats system - counter: histogram_slow_lookups doc: Number of times histogram increments went through the slow diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql index 04b6d471f6..7d1ab1dae9 100644 --- a/src/core/lib/debug/stats_data_bq_schema.sql +++ b/src/core/lib/debug/stats_data_bq_schema.sql @@ -12,6 +12,8 @@ pollset_kicked_again_per_iteration:FLOAT, pollset_kick_wakeup_fd_per_iteration:FLOAT, pollset_kick_wakeup_cv_per_iteration:FLOAT, pollset_kick_own_thread_per_iteration:FLOAT, +syscall_epoll_ctl_per_iteration:FLOAT, +pollset_fd_cache_hits_per_iteration:FLOAT, histogram_slow_lookups_per_iteration:FLOAT, syscall_write_per_iteration:FLOAT, syscall_read_per_iteration:FLOAT, diff --git a/src/core/lib/gprpp/memory.h b/src/core/lib/gprpp/memory.h index ba2f546675..1354109bf3 100644 --- a/src/core/lib/gprpp/memory.h +++ b/src/core/lib/gprpp/memory.h @@ -27,6 +27,17 @@ #include <memory> #include <utility> +// Add this to a class that want to use Delete(), but has a private or +// protected destructor. +#define GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE \ + template <typename T> \ + friend void Delete(T*); +// Add this to a class that want to use New(), but has a private or +// protected constructor. +#define GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW \ + template <typename T, typename... Args> \ + friend T* New(Args&&...); + namespace grpc_core { // The alignment of memory returned by gpr_malloc(). diff --git a/src/core/lib/gprpp/orphanable.h b/src/core/lib/gprpp/orphanable.h index 73a73995c7..d0ec9b6461 100644 --- a/src/core/lib/gprpp/orphanable.h +++ b/src/core/lib/gprpp/orphanable.h @@ -83,9 +83,7 @@ class InternallyRefCounted : public Orphanable { GRPC_ABSTRACT_BASE_CLASS protected: - // Allow Delete() to access destructor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE // Allow RefCountedPtr<> to access Unref() and IncrementRefCount(). friend class RefCountedPtr<Child>; @@ -128,9 +126,7 @@ class InternallyRefCountedWithTracing : public Orphanable { GRPC_ABSTRACT_BASE_CLASS protected: - // Allow Delete() to access destructor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE // Allow RefCountedPtr<> to access Unref() and IncrementRefCount(). friend class RefCountedPtr<Child>; diff --git a/src/core/lib/gprpp/ref_counted.h b/src/core/lib/gprpp/ref_counted.h index c67e3f315c..ddac5bd475 100644 --- a/src/core/lib/gprpp/ref_counted.h +++ b/src/core/lib/gprpp/ref_counted.h @@ -65,9 +65,7 @@ class RefCounted { GRPC_ABSTRACT_BASE_CLASS protected: - // Allow Delete() to access destructor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE RefCounted() { gpr_ref_init(&refs_, 1); } @@ -135,9 +133,7 @@ class RefCountedWithTracing { GRPC_ABSTRACT_BASE_CLASS protected: - // Allow Delete() to access destructor. - template <typename T> - friend void Delete(T*); + GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE RefCountedWithTracing() : RefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {} diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc index 9429842eb8..6789e4d12d 100644 --- a/src/core/lib/iomgr/combiner.cc +++ b/src/core/lib/iomgr/combiner.cc @@ -63,11 +63,12 @@ struct grpc_combiner { gpr_refcount refs; }; +static void combiner_run(grpc_closure* closure, grpc_error* error); static void combiner_exec(grpc_closure* closure, grpc_error* error); static void combiner_finally_exec(grpc_closure* closure, grpc_error* error); static const grpc_closure_scheduler_vtable scheduler = { - combiner_exec, combiner_exec, "combiner:immediately"}; + combiner_run, combiner_exec, "combiner:immediately"}; static const grpc_closure_scheduler_vtable finally_scheduler = { combiner_finally_exec, combiner_finally_exec, "combiner:finally"}; @@ -343,6 +344,22 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) { grpc_closure_list_append(&lock->final_list, closure, error); } +static void combiner_run(grpc_closure* closure, grpc_error* error) { + grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler); +#ifndef NDEBUG + closure->scheduled = false; + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, + "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]", + lock, closure, closure->file_created, closure->line_created, + closure->file_initiated, closure->line_initiated)); +#endif + GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == + lock); + closure->cb(closure->cb_arg, error); + GRPC_ERROR_UNREF(error); +} + static void enqueue_finally(void* closure, grpc_error* error) { combiner_finally_exec(static_cast<grpc_closure*>(closure), GRPC_ERROR_REF(error)); diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 98369ddd6e..4c6cff7fe2 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -63,6 +63,7 @@ // a keepalive ping timeout issue. We may want to revert https://github // .com/grpc/grpc/pull/14943 once we figure out the root cause. #define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 +#define MAX_PROBE_EPOLL_FDS 32 grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, "pollable_refcount"); @@ -75,6 +76,12 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type; typedef struct pollable pollable; +typedef struct cached_fd { + intptr_t salt; + int fd; + uint64_t last_used; +} cached_fd; + /// A pollable is something that can be polled: it has an epoll set to poll on, /// and a wakeup fd for kicks /// There are three broad types: @@ -103,6 +110,11 @@ struct pollable { int event_cursor; int event_count; struct epoll_event events[MAX_EPOLL_EVENTS]; + + // Maintain a LRU-eviction cache of fds in this pollable + cached_fd fd_cache[MAX_PROBE_EPOLL_FDS]; + int fd_cache_size; + uint64_t fd_cache_counter; }; static const char* pollable_type_string(pollable_type t) { @@ -145,8 +157,11 @@ static void pollable_unref(pollable* p, int line, const char* reason); * Fd Declarations */ +static gpr_atm g_fd_salt; + struct grpc_fd { int fd; + intptr_t salt; /* refst format: bit 0 : 1=Active / 0=Orphaned bits 1-n : refcount @@ -354,6 +369,7 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); @@ -484,6 +500,8 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { (*p)->root_worker = nullptr; (*p)->event_cursor = 0; (*p)->event_count = 0; + (*p)->fd_cache_size = 0; + (*p)->fd_cache_counter = 0; return GRPC_ERROR_NONE; } @@ -524,7 +542,36 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { grpc_error* error = GRPC_ERROR_NONE; static const char* err_desc = "pollable_add_fd"; const int epfd = p->epfd; + gpr_mu_lock(&p->mu); + p->fd_cache_counter++; + // Handle the case of overflow for our cache counter by + // reseting the recency-counter on all cache objects + if (p->fd_cache_counter == 0) { + for (int i = 0; i < p->fd_cache_size; i++) { + p->fd_cache[i].last_used = 0; + } + } + int lru_idx = 0; + for (int i = 0; i < p->fd_cache_size; i++) { + if (p->fd_cache[i].fd == fd->fd && p->fd_cache[i].salt == fd->salt) { + GRPC_STATS_INC_POLLSET_FD_CACHE_HITS(); + p->fd_cache[i].last_used = p->fd_cache_counter; + gpr_mu_unlock(&p->mu); + return GRPC_ERROR_NONE; + } else if (p->fd_cache[i].last_used < p->fd_cache[lru_idx].last_used) { + lru_idx = i; + } + } + // Add to cache + if (p->fd_cache_size < MAX_PROBE_EPOLL_FDS) { + lru_idx = p->fd_cache_size; + p->fd_cache_size++; + } + p->fd_cache[lru_idx].fd = fd->fd; + p->fd_cache[lru_idx].salt = fd->salt; + p->fd_cache[lru_idx].last_used = p->fd_cache_counter; + gpr_mu_unlock(&p->mu); if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p); } @@ -533,6 +580,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { ev_fd.events = static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); ev_fd.data.ptr = fd; + GRPC_STATS_INC_SYSCALL_EPOLL_CTL(); if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { case EEXIST: diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc index 8cf4fe9928..539bc120ce 100644 --- a/src/core/lib/iomgr/resource_quota.cc +++ b/src/core/lib/iomgr/resource_quota.cc @@ -386,7 +386,7 @@ static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) { resource_quota->debug_only_last_reclaimer_resource_user = resource_user; resource_quota->debug_only_last_initiated_reclaimer = c; resource_user->reclaimers[destructive] = nullptr; - GRPC_CLOSURE_RUN(c, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE); return true; } diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 6144d389f7..900c056575 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/tcp_posix.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" +#include "src/core/lib/slice/slice_internal.h" extern grpc_core::TraceFlag grpc_tcp_trace; @@ -233,7 +234,7 @@ finish: error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_str_slice /* takes ownership */); } else { - grpc_slice_unref(addr_str_slice); + grpc_slice_unref_internal(addr_str_slice); } if (done) { // This is safe even outside the lock, because "done", the sentinel, is diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc index b3b2934014..990e8d632b 100644 --- a/src/core/lib/iomgr/tcp_custom.cc +++ b/src/core/lib/iomgr/tcp_custom.cc @@ -141,7 +141,7 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) { TCP_UNREF(tcp, "read"); tcp->read_slices = nullptr; tcp->read_cb = nullptr; - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); } static void custom_read_callback(grpc_custom_socket* socket, size_t nread, diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 153be05e83..b79ffe20f1 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -366,7 +366,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) { tcp->read_cb = nullptr; tcp->incoming_buffer = nullptr; - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); } #define MAX_READ_IOVEC 4 @@ -629,7 +629,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) { gpr_log(GPR_INFO, "write: %s", str); } - GRPC_CLOSURE_RUN(cb, error); + GRPC_CLOSURE_SCHED(cb, error); TCP_UNREF(tcp, "write"); } } diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 153ac63424..484d2b6077 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -187,11 +187,6 @@ static void on_read(void* arg, grpc_error* err) { goto error; } - read_notifier_pollset = - sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add( - &sp->server->next_pollset_to_assign, 1)) % - sp->server->pollset_count]; - /* loop until accept4 returns EAGAIN, and then re-arm notification */ for (;;) { grpc_resolved_address addr; @@ -233,6 +228,11 @@ static void on_read(void* arg, grpc_error* err) { grpc_fd* fdobj = grpc_fd_create(fd, name); + read_notifier_pollset = + sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add( + &sp->server->next_pollset_to_assign, 1)) % + sp->server->pollset_count]; + grpc_pollset_add_fd(read_notifier_pollset, fdobj); // Create acceptor. diff --git a/src/core/lib/security/security_connector/alts_security_connector.cc b/src/core/lib/security/security_connector/alts_security_connector.cc index 5ff7d7938b..35a787871a 100644 --- a/src/core/lib/security/security_connector/alts_security_connector.cc +++ b/src/core/lib/security/security_connector/alts_security_connector.cc @@ -30,6 +30,7 @@ #include "src/core/lib/security/credentials/alts/alts_credentials.h" #include "src/core/lib/security/transport/security_handshaker.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/transport/transport.h" #include "src/core/tsi/alts/handshaker/alts_tsi_handshaker.h" @@ -133,7 +134,7 @@ grpc_security_status grpc_alts_auth_context_from_tsi_peer( rpc_versions_prop->value.data, rpc_versions_prop->value.length); bool decode_result = grpc_gcp_rpc_protocol_versions_decode(slice, &peer_versions); - grpc_slice_unref(slice); + grpc_slice_unref_internal(slice); if (!decode_result) { gpr_log(GPR_ERROR, "Invalid peer rpc protocol versions."); return GRPC_SECURITY_ERROR; diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc index a30696703f..b54a7643e4 100644 --- a/src/core/lib/security/security_connector/security_connector.cc +++ b/src/core/lib/security/security_connector/security_connector.cc @@ -69,8 +69,11 @@ void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb) { /* Defines the cipher suites that we accept by default. All these cipher suites are compliant with HTTP2. */ -#define GRPC_SSL_CIPHER_SUITES \ - "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384" +#define GRPC_SSL_CIPHER_SUITES \ + "ECDHE-ECDSA-AES128-GCM-SHA256:" \ + "ECDHE-ECDSA-AES256-GCM-SHA384:" \ + "ECDHE-RSA-AES128-GCM-SHA256:" \ + "ECDHE-RSA-AES256-GCM-SHA384" static gpr_once cipher_suites_once = GPR_ONCE_INIT; static const char* cipher_suites = nullptr; diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index da488034ca..7ed1696f80 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -1259,8 +1259,12 @@ static void post_batch_completion(batch_control* bctl) { if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = nullptr; - GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag, - error); + /* This closure may be meant to be run within some combiner. Since we aren't + * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead + * of GRPC_CLOSURE_RUN. + */ + GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag, + error); GRPC_CALL_INTERNAL_UNREF(call, "completion"); } else { /* unrefs bctl->error */ diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc index cb15a71a91..16b85ca0db 100644 --- a/src/core/lib/transport/byte_stream.cc +++ b/src/core/lib/transport/byte_stream.cc @@ -45,7 +45,7 @@ SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer, SliceBufferByteStream::~SliceBufferByteStream() {} void SliceBufferByteStream::Orphan() { - grpc_slice_buffer_destroy(&backing_buffer_); + grpc_slice_buffer_destroy_internal(&backing_buffer_); GRPC_ERROR_UNREF(shutdown_error_); // Note: We do not actually delete the object here, since // SliceBufferByteStream is usually allocated as part of a larger |