aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/filters/client_channel/client_channel.cc2
-rw-r--r--src/core/ext/filters/client_channel/lb_policy.h4
-rw-r--r--src/core/ext/filters/client_channel/resolver.h4
-rw-r--r--src/core/ext/filters/client_channel/subchannel.cc2
-rw-r--r--src/core/ext/filters/http/client_authority_filter.cc7
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc10
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.cc2
-rw-r--r--src/core/lib/channel/channel_stack.cc5
-rw-r--r--src/core/lib/channel/channel_stack_builder.cc3
-rw-r--r--src/core/lib/channel/channel_stack_builder.h2
-rw-r--r--src/core/lib/channel/handshaker.cc21
-rw-r--r--src/core/lib/debug/stats_data.cc5
-rw-r--r--src/core/lib/debug/stats_data.h8
-rw-r--r--src/core/lib/debug/stats_data.yaml6
-rw-r--r--src/core/lib/debug/stats_data_bq_schema.sql2
-rw-r--r--src/core/lib/gprpp/memory.h11
-rw-r--r--src/core/lib/gprpp/orphanable.h8
-rw-r--r--src/core/lib/gprpp/ref_counted.h8
-rw-r--r--src/core/lib/iomgr/combiner.cc19
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc48
-rw-r--r--src/core/lib/iomgr/resource_quota.cc2
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.cc3
-rw-r--r--src/core/lib/iomgr/tcp_custom.cc2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc4
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.cc10
-rw-r--r--src/core/lib/security/security_connector/alts_security_connector.cc3
-rw-r--r--src/core/lib/security/security_connector/security_connector.cc7
-rw-r--r--src/core/lib/surface/call.cc8
-rw-r--r--src/core/lib/transport/byte_stream.cc2
29 files changed, 161 insertions, 57 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 3813190794..fa0c280f80 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -3242,7 +3242,7 @@ static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
"external_connectivity_watcher");
external_connectivity_watcher_list_remove(w->chand, w);
gpr_free(w);
- GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
}
static void watch_connectivity_state_locked(void* arg,
diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h
index 454e00a690..dab4466b21 100644
--- a/src/core/ext/filters/client_channel/lb_policy.h
+++ b/src/core/ext/filters/client_channel/lb_policy.h
@@ -162,9 +162,7 @@ class LoadBalancingPolicy
GRPC_ABSTRACT_BASE_CLASS
protected:
- // So Delete() can access our protected dtor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
explicit LoadBalancingPolicy(const Args& args);
virtual ~LoadBalancingPolicy();
diff --git a/src/core/ext/filters/client_channel/resolver.h b/src/core/ext/filters/client_channel/resolver.h
index 02380314dd..c7e37e4468 100644
--- a/src/core/ext/filters/client_channel/resolver.h
+++ b/src/core/ext/filters/client_channel/resolver.h
@@ -105,9 +105,7 @@ class Resolver : public InternallyRefCountedWithTracing<Resolver> {
GRPC_ABSTRACT_BASE_CLASS
protected:
- // So Delete() can access our protected dtor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
/// Does NOT take ownership of the reference to \a combiner.
// TODO(roth): Once we have a C++-like interface for combiners, this
diff --git a/src/core/ext/filters/client_channel/subchannel.cc b/src/core/ext/filters/client_channel/subchannel.cc
index 140441da10..ad6b6dd192 100644
--- a/src/core/ext/filters/client_channel/subchannel.cc
+++ b/src/core/ext/filters/client_channel/subchannel.cc
@@ -408,7 +408,7 @@ static void on_external_state_watcher_done(void* arg, grpc_error* error) {
gpr_mu_unlock(&w->subchannel->mu);
GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher");
gpr_free(w);
- GRPC_CLOSURE_RUN(follow_up, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
}
static void on_alarm(void* arg, grpc_error* error) {
diff --git a/src/core/ext/filters/http/client_authority_filter.cc b/src/core/ext/filters/http/client_authority_filter.cc
index 1f57ab5ce6..63b9150aec 100644
--- a/src/core/ext/filters/http/client_authority_filter.cc
+++ b/src/core/ext/filters/http/client_authority_filter.cc
@@ -59,8 +59,9 @@ void authority_start_transport_stream_op_batch(
initial_metadata->idx.named.authority == nullptr) {
grpc_error* error = grpc_metadata_batch_add_head(
initial_metadata, &calld->authority_storage,
- grpc_mdelem_from_slices(GRPC_MDSTR_AUTHORITY,
- grpc_slice_ref(chand->default_authority)));
+ grpc_mdelem_from_slices(
+ GRPC_MDSTR_AUTHORITY,
+ grpc_slice_ref_internal(chand->default_authority)));
if (error != GRPC_ERROR_NONE) {
grpc_transport_stream_op_batch_finish_with_failure(batch, error,
calld->call_combiner);
@@ -110,7 +111,7 @@ grpc_error* init_channel_elem(grpc_channel_element* elem,
/* Destructor for channel data */
void destroy_channel_elem(grpc_channel_element* elem) {
channel_data* chand = static_cast<channel_data*>(elem->channel_data);
- grpc_slice_unref(chand->default_authority);
+ grpc_slice_unref_internal(chand->default_authority);
}
} // namespace
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index 7ff7cabfbd..cc4a823798 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1684,16 +1684,16 @@ static void send_ping_locked(grpc_chttp2_transport* t,
*/
static void send_keepalive_ping_locked(grpc_chttp2_transport* t) {
if (t->closed_with_error != GRPC_ERROR_NONE) {
- GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked,
- GRPC_ERROR_REF(t->closed_with_error));
- GRPC_CLOSURE_SCHED(&t->finish_keepalive_ping_locked,
- GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked,
+ GRPC_ERROR_REF(t->closed_with_error));
+ GRPC_CLOSURE_RUN(&t->finish_keepalive_ping_locked,
+ GRPC_ERROR_REF(t->closed_with_error));
return;
}
grpc_chttp2_ping_queue* pq = &t->ping_queue;
if (!grpc_closure_list_empty(pq->lists[GRPC_CHTTP2_PCL_INFLIGHT])) {
/* There is a ping in flight. Add yourself to the inflight closure list. */
- GRPC_CLOSURE_SCHED(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(&t->start_keepalive_ping_locked, GRPC_ERROR_NONE);
grpc_closure_list_append(&pq->lists[GRPC_CHTTP2_PCL_INFLIGHT],
&t->finish_keepalive_ping_locked, GRPC_ERROR_NONE);
return;
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.cc b/src/core/ext/transport/cronet/transport/cronet_transport.cc
index 0b88ff7afe..420c2d13e1 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.cc
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.cc
@@ -736,7 +736,7 @@ static void convert_metadata_to_cronet_headers(
if (grpc_is_binary_header(GRPC_MDKEY(mdelem))) {
grpc_slice wire_value = grpc_chttp2_base64_encode(GRPC_MDVALUE(mdelem));
value = grpc_slice_to_c_string(wire_value);
- grpc_slice_unref(wire_value);
+ grpc_slice_unref_internal(wire_value);
} else {
value = grpc_slice_to_c_string(GRPC_MDVALUE(mdelem));
}
diff --git a/src/core/lib/channel/channel_stack.cc b/src/core/lib/channel/channel_stack.cc
index a9459b150d..ef6482cb7f 100644
--- a/src/core/lib/channel/channel_stack.cc
+++ b/src/core/lib/channel/channel_stack.cc
@@ -193,18 +193,13 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
grpc_polling_entity* pollent) {
size_t count = call_stack->count;
grpc_call_element* call_elems;
- char* user_data;
size_t i;
call_elems = CALL_ELEMS_FROM_STACK(call_stack);
- user_data = (reinterpret_cast<char*>(call_elems)) +
- ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */
for (i = 0; i < count; i++) {
call_elems[i].filter->set_pollset_or_pollset_set(&call_elems[i], pollent);
- user_data +=
- ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
}
diff --git a/src/core/lib/channel/channel_stack_builder.cc b/src/core/lib/channel/channel_stack_builder.cc
index 8a72449034..df5a783631 100644
--- a/src/core/lib/channel/channel_stack_builder.cc
+++ b/src/core/lib/channel/channel_stack_builder.cc
@@ -25,9 +25,6 @@
#include <grpc/support/alloc.h>
#include <grpc/support/string_util.h>
-grpc_core::TraceFlag grpc_trace_channel_stack_builder(false,
- "channel_stack_builder");
-
typedef struct filter_node {
struct filter_node* next;
struct filter_node* prev;
diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h
index c9a170bc88..9196de9378 100644
--- a/src/core/lib/channel/channel_stack_builder.h
+++ b/src/core/lib/channel/channel_stack_builder.h
@@ -155,6 +155,4 @@ grpc_error* grpc_channel_stack_builder_finish(
/// Destroy the builder without creating a channel stack
void grpc_channel_stack_builder_destroy(grpc_channel_stack_builder* builder);
-extern grpc_core::TraceFlag grpc_trace_channel_stack_builder;
-
#endif /* GRPC_CORE_LIB_CHANNEL_CHANNEL_STACK_BUILDER_H */
diff --git a/src/core/lib/channel/handshaker.cc b/src/core/lib/channel/handshaker.cc
index 2faeb64cb6..86f8699e04 100644
--- a/src/core/lib/channel/handshaker.cc
+++ b/src/core/lib/channel/handshaker.cc
@@ -28,6 +28,7 @@
#include "src/core/lib/channel/handshaker.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/slice_internal.h"
grpc_core::TraceFlag grpc_handshaker_trace(false, "handshaker");
@@ -220,8 +221,26 @@ static bool call_next_handshaker_locked(grpc_handshake_manager* mgr,
// callback. Otherwise, call the next handshaker.
if (error != GRPC_ERROR_NONE || mgr->shutdown || mgr->args.exit_early ||
mgr->index == mgr->count) {
+ if (error == GRPC_ERROR_NONE && mgr->shutdown) {
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("handshaker shutdown");
+ // TODO(roth): It is currently necessary to shutdown endpoints
+ // before destroying then, even when we know that there are no
+ // pending read/write callbacks. This should be fixed, at which
+ // point this can be removed.
+ grpc_endpoint_shutdown(mgr->args.endpoint, GRPC_ERROR_REF(error));
+ grpc_endpoint_destroy(mgr->args.endpoint);
+ mgr->args.endpoint = nullptr;
+ grpc_channel_args_destroy(mgr->args.args);
+ mgr->args.args = nullptr;
+ grpc_slice_buffer_destroy_internal(mgr->args.read_buffer);
+ gpr_free(mgr->args.read_buffer);
+ mgr->args.read_buffer = nullptr;
+ }
if (grpc_handshaker_trace.enabled()) {
- gpr_log(GPR_INFO, "handshake_manager %p: handshaking complete", mgr);
+ gpr_log(GPR_INFO,
+ "handshake_manager %p: handshaking complete -- scheduling "
+ "on_handshake_done with error=%s",
+ mgr, grpc_error_string(error));
}
// Cancel deadline timer, since we're invoking the on_handshake_done
// callback now.
diff --git a/src/core/lib/debug/stats_data.cc b/src/core/lib/debug/stats_data.cc
index 309ece94bb..f8c27db0a8 100644
--- a/src/core/lib/debug/stats_data.cc
+++ b/src/core/lib/debug/stats_data.cc
@@ -40,6 +40,8 @@ const char* grpc_stats_counter_name[GRPC_STATS_COUNTER_COUNT] = {
"pollset_kick_wakeup_fd",
"pollset_kick_wakeup_cv",
"pollset_kick_own_thread",
+ "syscall_epoll_ctl",
+ "pollset_fd_cache_hits",
"histogram_slow_lookups",
"syscall_write",
"syscall_read",
@@ -144,6 +146,9 @@ const char* grpc_stats_counter_doc[GRPC_STATS_COUNTER_COUNT] = {
"polling wakeup (only valid for epoll1 right now)",
"How many times could a polling wakeup be satisfied by keeping the waking "
"thread awake? (only valid for epoll1 right now)",
+ "Number of epoll_ctl calls made (only valid for epollex right now)",
+ "Number of epoll_ctl calls skipped because the fd was cached as already "
+ "being added. (only valid for epollex right now)",
"Number of times histogram increments went through the slow (binary "
"search) path",
"Number of write syscalls (or equivalent - eg sendmsg) made by this "
diff --git a/src/core/lib/debug/stats_data.h b/src/core/lib/debug/stats_data.h
index 37c548095f..1f3861f494 100644
--- a/src/core/lib/debug/stats_data.h
+++ b/src/core/lib/debug/stats_data.h
@@ -41,6 +41,8 @@ typedef enum {
GRPC_STATS_COUNTER_POLLSET_KICK_WAKEUP_FD,
GRPC_STATS_COUNTER_POLLSET_KICK_WAKEUP_CV,
GRPC_STATS_COUNTER_POLLSET_KICK_OWN_THREAD,
+ GRPC_STATS_COUNTER_SYSCALL_EPOLL_CTL,
+ GRPC_STATS_COUNTER_POLLSET_FD_CACHE_HITS,
GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS,
GRPC_STATS_COUNTER_SYSCALL_WRITE,
GRPC_STATS_COUNTER_SYSCALL_READ,
@@ -203,6 +205,10 @@ typedef enum {
GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_POLLSET_KICK_WAKEUP_CV)
#define GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD() \
GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_POLLSET_KICK_OWN_THREAD)
+#define GRPC_STATS_INC_SYSCALL_EPOLL_CTL() \
+ GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_SYSCALL_EPOLL_CTL)
+#define GRPC_STATS_INC_POLLSET_FD_CACHE_HITS() \
+ GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_POLLSET_FD_CACHE_HITS)
#define GRPC_STATS_INC_HISTOGRAM_SLOW_LOOKUPS() \
GRPC_STATS_INC_COUNTER(GRPC_STATS_COUNTER_HISTOGRAM_SLOW_LOOKUPS)
#define GRPC_STATS_INC_SYSCALL_WRITE() \
@@ -443,6 +449,8 @@ void grpc_stats_inc_server_cqs_checked(int x);
#define GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD()
#define GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV()
#define GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD()
+#define GRPC_STATS_INC_SYSCALL_EPOLL_CTL()
+#define GRPC_STATS_INC_POLLSET_FD_CACHE_HITS()
#define GRPC_STATS_INC_HISTOGRAM_SLOW_LOOKUPS()
#define GRPC_STATS_INC_SYSCALL_WRITE()
#define GRPC_STATS_INC_SYSCALL_READ()
diff --git a/src/core/lib/debug/stats_data.yaml b/src/core/lib/debug/stats_data.yaml
index af4553028e..775b09df74 100644
--- a/src/core/lib/debug/stats_data.yaml
+++ b/src/core/lib/debug/stats_data.yaml
@@ -63,6 +63,12 @@
doc: How many times could a polling wakeup be satisfied by keeping the waking
thread awake?
(only valid for epoll1 right now)
+# polling
+- counter: syscall_epoll_ctl
+ doc: Number of epoll_ctl calls made (only valid for epollex right now)
+- counter: pollset_fd_cache_hits
+ doc: Number of epoll_ctl calls skipped because the fd was cached as
+ already being added. (only valid for epollex right now)
# stats system
- counter: histogram_slow_lookups
doc: Number of times histogram increments went through the slow
diff --git a/src/core/lib/debug/stats_data_bq_schema.sql b/src/core/lib/debug/stats_data_bq_schema.sql
index 04b6d471f6..7d1ab1dae9 100644
--- a/src/core/lib/debug/stats_data_bq_schema.sql
+++ b/src/core/lib/debug/stats_data_bq_schema.sql
@@ -12,6 +12,8 @@ pollset_kicked_again_per_iteration:FLOAT,
pollset_kick_wakeup_fd_per_iteration:FLOAT,
pollset_kick_wakeup_cv_per_iteration:FLOAT,
pollset_kick_own_thread_per_iteration:FLOAT,
+syscall_epoll_ctl_per_iteration:FLOAT,
+pollset_fd_cache_hits_per_iteration:FLOAT,
histogram_slow_lookups_per_iteration:FLOAT,
syscall_write_per_iteration:FLOAT,
syscall_read_per_iteration:FLOAT,
diff --git a/src/core/lib/gprpp/memory.h b/src/core/lib/gprpp/memory.h
index ba2f546675..1354109bf3 100644
--- a/src/core/lib/gprpp/memory.h
+++ b/src/core/lib/gprpp/memory.h
@@ -27,6 +27,17 @@
#include <memory>
#include <utility>
+// Add this to a class that want to use Delete(), but has a private or
+// protected destructor.
+#define GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE \
+ template <typename T> \
+ friend void Delete(T*);
+// Add this to a class that want to use New(), but has a private or
+// protected constructor.
+#define GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW \
+ template <typename T, typename... Args> \
+ friend T* New(Args&&...);
+
namespace grpc_core {
// The alignment of memory returned by gpr_malloc().
diff --git a/src/core/lib/gprpp/orphanable.h b/src/core/lib/gprpp/orphanable.h
index 73a73995c7..d0ec9b6461 100644
--- a/src/core/lib/gprpp/orphanable.h
+++ b/src/core/lib/gprpp/orphanable.h
@@ -83,9 +83,7 @@ class InternallyRefCounted : public Orphanable {
GRPC_ABSTRACT_BASE_CLASS
protected:
- // Allow Delete() to access destructor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
// Allow RefCountedPtr<> to access Unref() and IncrementRefCount().
friend class RefCountedPtr<Child>;
@@ -128,9 +126,7 @@ class InternallyRefCountedWithTracing : public Orphanable {
GRPC_ABSTRACT_BASE_CLASS
protected:
- // Allow Delete() to access destructor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
// Allow RefCountedPtr<> to access Unref() and IncrementRefCount().
friend class RefCountedPtr<Child>;
diff --git a/src/core/lib/gprpp/ref_counted.h b/src/core/lib/gprpp/ref_counted.h
index c67e3f315c..ddac5bd475 100644
--- a/src/core/lib/gprpp/ref_counted.h
+++ b/src/core/lib/gprpp/ref_counted.h
@@ -65,9 +65,7 @@ class RefCounted {
GRPC_ABSTRACT_BASE_CLASS
protected:
- // Allow Delete() to access destructor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
RefCounted() { gpr_ref_init(&refs_, 1); }
@@ -135,9 +133,7 @@ class RefCountedWithTracing {
GRPC_ABSTRACT_BASE_CLASS
protected:
- // Allow Delete() to access destructor.
- template <typename T>
- friend void Delete(T*);
+ GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
RefCountedWithTracing()
: RefCountedWithTracing(static_cast<TraceFlag*>(nullptr)) {}
diff --git a/src/core/lib/iomgr/combiner.cc b/src/core/lib/iomgr/combiner.cc
index 9429842eb8..6789e4d12d 100644
--- a/src/core/lib/iomgr/combiner.cc
+++ b/src/core/lib/iomgr/combiner.cc
@@ -63,11 +63,12 @@ struct grpc_combiner {
gpr_refcount refs;
};
+static void combiner_run(grpc_closure* closure, grpc_error* error);
static void combiner_exec(grpc_closure* closure, grpc_error* error);
static void combiner_finally_exec(grpc_closure* closure, grpc_error* error);
static const grpc_closure_scheduler_vtable scheduler = {
- combiner_exec, combiner_exec, "combiner:immediately"};
+ combiner_run, combiner_exec, "combiner:immediately"};
static const grpc_closure_scheduler_vtable finally_scheduler = {
combiner_finally_exec, combiner_finally_exec, "combiner:finally"};
@@ -343,6 +344,22 @@ static void combiner_finally_exec(grpc_closure* closure, grpc_error* error) {
grpc_closure_list_append(&lock->final_list, closure, error);
}
+static void combiner_run(grpc_closure* closure, grpc_error* error) {
+ grpc_combiner* lock = COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler);
+#ifndef NDEBUG
+ closure->scheduled = false;
+ GRPC_COMBINER_TRACE(gpr_log(
+ GPR_DEBUG,
+ "Combiner:%p grpc_combiner_run closure:%p created [%s:%d] run [%s:%d]",
+ lock, closure, closure->file_created, closure->line_created,
+ closure->file_initiated, closure->line_initiated));
+#endif
+ GPR_ASSERT(grpc_core::ExecCtx::Get()->combiner_data()->active_combiner ==
+ lock);
+ closure->cb(closure->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+}
+
static void enqueue_finally(void* closure, grpc_error* error) {
combiner_finally_exec(static_cast<grpc_closure*>(closure),
GRPC_ERROR_REF(error));
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 98369ddd6e..4c6cff7fe2 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -63,6 +63,7 @@
// a keepalive ping timeout issue. We may want to revert https://github
// .com/grpc/grpc/pull/14943 once we figure out the root cause.
#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16
+#define MAX_PROBE_EPOLL_FDS 32
grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false,
"pollable_refcount");
@@ -75,6 +76,12 @@ typedef enum { PO_MULTI, PO_FD, PO_EMPTY } pollable_type;
typedef struct pollable pollable;
+typedef struct cached_fd {
+ intptr_t salt;
+ int fd;
+ uint64_t last_used;
+} cached_fd;
+
/// A pollable is something that can be polled: it has an epoll set to poll on,
/// and a wakeup fd for kicks
/// There are three broad types:
@@ -103,6 +110,11 @@ struct pollable {
int event_cursor;
int event_count;
struct epoll_event events[MAX_EPOLL_EVENTS];
+
+ // Maintain a LRU-eviction cache of fds in this pollable
+ cached_fd fd_cache[MAX_PROBE_EPOLL_FDS];
+ int fd_cache_size;
+ uint64_t fd_cache_counter;
};
static const char* pollable_type_string(pollable_type t) {
@@ -145,8 +157,11 @@ static void pollable_unref(pollable* p, int line, const char* reason);
* Fd Declarations
*/
+static gpr_atm g_fd_salt;
+
struct grpc_fd {
int fd;
+ intptr_t salt;
/* refst format:
bit 0 : 1=Active / 0=Orphaned
bits 1-n : refcount
@@ -354,6 +369,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
new_fd->pollable_obj = nullptr;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
+ new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1);
new_fd->read_closure->InitEvent();
new_fd->write_closure->InitEvent();
gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
@@ -484,6 +500,8 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) {
(*p)->root_worker = nullptr;
(*p)->event_cursor = 0;
(*p)->event_count = 0;
+ (*p)->fd_cache_size = 0;
+ (*p)->fd_cache_counter = 0;
return GRPC_ERROR_NONE;
}
@@ -524,7 +542,36 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
grpc_error* error = GRPC_ERROR_NONE;
static const char* err_desc = "pollable_add_fd";
const int epfd = p->epfd;
+ gpr_mu_lock(&p->mu);
+ p->fd_cache_counter++;
+ // Handle the case of overflow for our cache counter by
+ // reseting the recency-counter on all cache objects
+ if (p->fd_cache_counter == 0) {
+ for (int i = 0; i < p->fd_cache_size; i++) {
+ p->fd_cache[i].last_used = 0;
+ }
+ }
+ int lru_idx = 0;
+ for (int i = 0; i < p->fd_cache_size; i++) {
+ if (p->fd_cache[i].fd == fd->fd && p->fd_cache[i].salt == fd->salt) {
+ GRPC_STATS_INC_POLLSET_FD_CACHE_HITS();
+ p->fd_cache[i].last_used = p->fd_cache_counter;
+ gpr_mu_unlock(&p->mu);
+ return GRPC_ERROR_NONE;
+ } else if (p->fd_cache[i].last_used < p->fd_cache[lru_idx].last_used) {
+ lru_idx = i;
+ }
+ }
+ // Add to cache
+ if (p->fd_cache_size < MAX_PROBE_EPOLL_FDS) {
+ lru_idx = p->fd_cache_size;
+ p->fd_cache_size++;
+ }
+ p->fd_cache[lru_idx].fd = fd->fd;
+ p->fd_cache[lru_idx].salt = fd->salt;
+ p->fd_cache[lru_idx].last_used = p->fd_cache_counter;
+ gpr_mu_unlock(&p->mu);
if (grpc_polling_trace.enabled()) {
gpr_log(GPR_INFO, "add fd %p (%d) to pollable %p", fd, fd->fd, p);
}
@@ -533,6 +580,7 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) {
ev_fd.events =
static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE);
ev_fd.data.ptr = fd;
+ GRPC_STATS_INC_SYSCALL_EPOLL_CTL();
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) {
switch (errno) {
case EEXIST:
diff --git a/src/core/lib/iomgr/resource_quota.cc b/src/core/lib/iomgr/resource_quota.cc
index 8cf4fe9928..539bc120ce 100644
--- a/src/core/lib/iomgr/resource_quota.cc
+++ b/src/core/lib/iomgr/resource_quota.cc
@@ -386,7 +386,7 @@ static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
resource_quota->debug_only_last_initiated_reclaimer = c;
resource_user->reclaimers[destructive] = nullptr;
- GRPC_CLOSURE_RUN(c, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
return true;
}
diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc
index 6144d389f7..900c056575 100644
--- a/src/core/lib/iomgr/tcp_client_posix.cc
+++ b/src/core/lib/iomgr/tcp_client_posix.cc
@@ -45,6 +45,7 @@
#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
+#include "src/core/lib/slice/slice_internal.h"
extern grpc_core::TraceFlag grpc_tcp_trace;
@@ -233,7 +234,7 @@ finish:
error = grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS,
addr_str_slice /* takes ownership */);
} else {
- grpc_slice_unref(addr_str_slice);
+ grpc_slice_unref_internal(addr_str_slice);
}
if (done) {
// This is safe even outside the lock, because "done", the sentinel, is
diff --git a/src/core/lib/iomgr/tcp_custom.cc b/src/core/lib/iomgr/tcp_custom.cc
index b3b2934014..990e8d632b 100644
--- a/src/core/lib/iomgr/tcp_custom.cc
+++ b/src/core/lib/iomgr/tcp_custom.cc
@@ -141,7 +141,7 @@ static void call_read_cb(custom_tcp_endpoint* tcp, grpc_error* error) {
TCP_UNREF(tcp, "read");
tcp->read_slices = nullptr;
tcp->read_cb = nullptr;
- GRPC_CLOSURE_RUN(cb, error);
+ GRPC_CLOSURE_SCHED(cb, error);
}
static void custom_read_callback(grpc_custom_socket* socket, size_t nread,
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 153be05e83..b79ffe20f1 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -366,7 +366,7 @@ static void call_read_cb(grpc_tcp* tcp, grpc_error* error) {
tcp->read_cb = nullptr;
tcp->incoming_buffer = nullptr;
- GRPC_CLOSURE_RUN(cb, error);
+ GRPC_CLOSURE_SCHED(cb, error);
}
#define MAX_READ_IOVEC 4
@@ -629,7 +629,7 @@ static void tcp_handle_write(void* arg /* grpc_tcp */, grpc_error* error) {
gpr_log(GPR_INFO, "write: %s", str);
}
- GRPC_CLOSURE_RUN(cb, error);
+ GRPC_CLOSURE_SCHED(cb, error);
TCP_UNREF(tcp, "write");
}
}
diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc
index 153ac63424..484d2b6077 100644
--- a/src/core/lib/iomgr/tcp_server_posix.cc
+++ b/src/core/lib/iomgr/tcp_server_posix.cc
@@ -187,11 +187,6 @@ static void on_read(void* arg, grpc_error* err) {
goto error;
}
- read_notifier_pollset =
- sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
- &sp->server->next_pollset_to_assign, 1)) %
- sp->server->pollset_count];
-
/* loop until accept4 returns EAGAIN, and then re-arm notification */
for (;;) {
grpc_resolved_address addr;
@@ -233,6 +228,11 @@ static void on_read(void* arg, grpc_error* err) {
grpc_fd* fdobj = grpc_fd_create(fd, name);
+ read_notifier_pollset =
+ sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
+ &sp->server->next_pollset_to_assign, 1)) %
+ sp->server->pollset_count];
+
grpc_pollset_add_fd(read_notifier_pollset, fdobj);
// Create acceptor.
diff --git a/src/core/lib/security/security_connector/alts_security_connector.cc b/src/core/lib/security/security_connector/alts_security_connector.cc
index 5ff7d7938b..35a787871a 100644
--- a/src/core/lib/security/security_connector/alts_security_connector.cc
+++ b/src/core/lib/security/security_connector/alts_security_connector.cc
@@ -30,6 +30,7 @@
#include "src/core/lib/security/credentials/alts/alts_credentials.h"
#include "src/core/lib/security/transport/security_handshaker.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/transport/transport.h"
#include "src/core/tsi/alts/handshaker/alts_tsi_handshaker.h"
@@ -133,7 +134,7 @@ grpc_security_status grpc_alts_auth_context_from_tsi_peer(
rpc_versions_prop->value.data, rpc_versions_prop->value.length);
bool decode_result =
grpc_gcp_rpc_protocol_versions_decode(slice, &peer_versions);
- grpc_slice_unref(slice);
+ grpc_slice_unref_internal(slice);
if (!decode_result) {
gpr_log(GPR_ERROR, "Invalid peer rpc protocol versions.");
return GRPC_SECURITY_ERROR;
diff --git a/src/core/lib/security/security_connector/security_connector.cc b/src/core/lib/security/security_connector/security_connector.cc
index a30696703f..b54a7643e4 100644
--- a/src/core/lib/security/security_connector/security_connector.cc
+++ b/src/core/lib/security/security_connector/security_connector.cc
@@ -69,8 +69,11 @@ void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb) {
/* Defines the cipher suites that we accept by default. All these cipher suites
are compliant with HTTP2. */
-#define GRPC_SSL_CIPHER_SUITES \
- "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384"
+#define GRPC_SSL_CIPHER_SUITES \
+ "ECDHE-ECDSA-AES128-GCM-SHA256:" \
+ "ECDHE-ECDSA-AES256-GCM-SHA384:" \
+ "ECDHE-RSA-AES128-GCM-SHA256:" \
+ "ECDHE-RSA-AES256-GCM-SHA384"
static gpr_once cipher_suites_once = GPR_ONCE_INIT;
static const char* cipher_suites = nullptr;
diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc
index da488034ca..7ed1696f80 100644
--- a/src/core/lib/surface/call.cc
+++ b/src/core/lib/surface/call.cc
@@ -1259,8 +1259,12 @@ static void post_batch_completion(batch_control* bctl) {
if (bctl->completion_data.notify_tag.is_closure) {
/* unrefs bctl->error */
bctl->call = nullptr;
- GRPC_CLOSURE_RUN((grpc_closure*)bctl->completion_data.notify_tag.tag,
- error);
+ /* This closure may be meant to be run within some combiner. Since we aren't
+ * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
+ * of GRPC_CLOSURE_RUN.
+ */
+ GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
+ error);
GRPC_CALL_INTERNAL_UNREF(call, "completion");
} else {
/* unrefs bctl->error */
diff --git a/src/core/lib/transport/byte_stream.cc b/src/core/lib/transport/byte_stream.cc
index cb15a71a91..16b85ca0db 100644
--- a/src/core/lib/transport/byte_stream.cc
+++ b/src/core/lib/transport/byte_stream.cc
@@ -45,7 +45,7 @@ SliceBufferByteStream::SliceBufferByteStream(grpc_slice_buffer* slice_buffer,
SliceBufferByteStream::~SliceBufferByteStream() {}
void SliceBufferByteStream::Orphan() {
- grpc_slice_buffer_destroy(&backing_buffer_);
+ grpc_slice_buffer_destroy_internal(&backing_buffer_);
GRPC_ERROR_UNREF(shutdown_error_);
// Note: We do not actually delete the object here, since
// SliceBufferByteStream is usually allocated as part of a larger