aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-07 15:54:59 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-09-07 15:54:59 -0700
commit56969c2784911583825d75212ca027806d1de056 (patch)
tree35ccaa433f197d7977533e24210ddefb7632a12e /src/core/lib
parentf0ba70a9ea92b6c7422f40206e763141c05eb281 (diff)
parent41630a29507c8dd5b6110f0397b346b7feab442b (diff)
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/channel_stack.c16
-rw-r--r--src/core/lib/channel/channel_stack.h11
-rw-r--r--src/core/lib/channel/channel_stack_builder.c29
-rw-r--r--src/core/lib/channel/channel_stack_builder.h10
-rw-r--r--src/core/lib/channel/connected_channel.c92
-rw-r--r--src/core/lib/debug/stats_data.c139
-rw-r--r--src/core/lib/iomgr/call_combiner.c202
-rw-r--r--src/core/lib/iomgr/call_combiner.h121
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c46
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c1962
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.h28
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c1185
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.h28
-rw-r--r--src/core/lib/iomgr/ev_posix.c4
-rw-r--r--src/core/lib/iomgr/iomgr.c8
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c203
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c91
-rw-r--r--src/core/lib/support/string.c13
-rw-r--r--src/core/lib/support/string.h3
-rw-r--r--src/core/lib/surface/call.c154
-rw-r--r--src/core/lib/surface/call.h10
-rw-r--r--src/core/lib/surface/call_log_batch.c2
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/core/lib/surface/lame_client.cc19
-rw-r--r--src/core/lib/surface/server.c2
-rw-r--r--src/core/lib/transport/metadata_batch.c2
-rw-r--r--src/core/lib/transport/metadata_batch.h1
-rw-r--r--src/core/lib/transport/static_metadata.c25
-rw-r--r--src/core/lib/transport/static_metadata.h2
-rw-r--r--src/core/lib/transport/transport.c21
-rw-r--r--src/core/lib/transport/transport.h13
-rw-r--r--src/core/lib/transport/transport_impl.h3
-rw-r--r--src/core/lib/transport/transport_op_string.c7
34 files changed, 884 insertions, 3572 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 0f8e33c4be..775c8bc667 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -233,15 +233,10 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
grpc_call_element *next_elem = elem + 1;
+ GRPC_CALL_LOG_OP(GPR_INFO, next_elem, op);
next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op);
}
-char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- grpc_call_element *next_elem = elem + 1;
- return next_elem->filter->get_peer(exec_ctx, next_elem);
-}
-
void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
const grpc_channel_info *channel_info) {
@@ -265,12 +260,3 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
sizeof(grpc_call_stack)));
}
-
-void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_error *error) {
- grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL);
- op->cancel_stream = true;
- op->payload->cancel_stream.cancel_error = error;
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
-}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index a80f8aa826..ae1cac31f7 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -40,6 +40,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/transport.h"
@@ -71,6 +72,7 @@ typedef struct {
gpr_timespec start_time;
gpr_timespec deadline;
gpr_arena *arena;
+ grpc_call_combiner *call_combiner;
} grpc_call_element_args;
typedef struct {
@@ -150,9 +152,6 @@ typedef struct {
void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem);
- /* Implement grpc_call_get_peer() */
- char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
-
/* Implement grpc_channel_get_info() */
void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
const grpc_channel_info *channel_info);
@@ -271,8 +270,6 @@ void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
stack */
void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_transport_op *op);
-/* Pass through a request to get_peer to the next child element */
-char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
/* Pass through a request to get_channel_info() to the next child element */
void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -288,10 +285,6 @@ void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op);
-void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem,
- grpc_error *error);
-
extern grpc_tracer_flag grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c
index c369e33073..7f2b8e07ce 100644
--- a/src/core/lib/channel/channel_stack_builder.c
+++ b/src/core/lib/channel/channel_stack_builder.c
@@ -124,6 +124,20 @@ bool grpc_channel_stack_builder_move_prev(
return true;
}
+grpc_channel_stack_builder_iterator *grpc_channel_stack_builder_iterator_find(
+ grpc_channel_stack_builder *builder, const char *filter_name) {
+ GPR_ASSERT(filter_name != NULL);
+ grpc_channel_stack_builder_iterator *it =
+ grpc_channel_stack_builder_create_iterator_at_first(builder);
+ while (grpc_channel_stack_builder_move_next(it)) {
+ if (grpc_channel_stack_builder_iterator_is_end(it)) break;
+ const char *filter_name_at_it =
+ grpc_channel_stack_builder_iterator_filter_name(it);
+ if (strcmp(filter_name, filter_name_at_it) == 0) break;
+ }
+ return it;
+}
+
bool grpc_channel_stack_builder_move_prev(
grpc_channel_stack_builder_iterator *iterator);
@@ -169,6 +183,21 @@ bool grpc_channel_stack_builder_append_filter(
return ok;
}
+bool grpc_channel_stack_builder_remove_filter(
+ grpc_channel_stack_builder *builder, const char *filter_name) {
+ grpc_channel_stack_builder_iterator *it =
+ grpc_channel_stack_builder_iterator_find(builder, filter_name);
+ if (grpc_channel_stack_builder_iterator_is_end(it)) {
+ grpc_channel_stack_builder_iterator_destroy(it);
+ return false;
+ }
+ it->node->prev->next = it->node->next;
+ it->node->next->prev = it->node->prev;
+ gpr_free(it->node);
+ grpc_channel_stack_builder_iterator_destroy(it);
+ return true;
+}
+
bool grpc_channel_stack_builder_prepend_filter(
grpc_channel_stack_builder *builder, const grpc_channel_filter *filter,
grpc_post_filter_create_init_func post_init_func, void *user_data) {
diff --git a/src/core/lib/channel/channel_stack_builder.h b/src/core/lib/channel/channel_stack_builder.h
index d43e427962..fdff2a2b6d 100644
--- a/src/core/lib/channel/channel_stack_builder.h
+++ b/src/core/lib/channel/channel_stack_builder.h
@@ -95,6 +95,11 @@ bool grpc_channel_stack_builder_move_next(
bool grpc_channel_stack_builder_move_prev(
grpc_channel_stack_builder_iterator *iterator);
+/// Return an iterator at \a filter_name, or at the end of the list if not
+/// found.
+grpc_channel_stack_builder_iterator *grpc_channel_stack_builder_iterator_find(
+ grpc_channel_stack_builder *builder, const char *filter_name);
+
typedef void (*grpc_post_filter_create_init_func)(
grpc_channel_stack *channel_stack, grpc_channel_element *elem, void *arg);
@@ -132,6 +137,11 @@ bool grpc_channel_stack_builder_append_filter(
grpc_post_filter_create_init_func post_init_func,
void *user_data) GRPC_MUST_USE_RESULT;
+/// Remove any filter whose name is \a filter_name from \a builder. Returns true
+/// if \a filter_name was not found.
+bool grpc_channel_stack_builder_remove_filter(
+ grpc_channel_stack_builder *builder, const char *filter_name);
+
/// Terminate iteration and destroy \a iterator
void grpc_channel_stack_builder_iterator_destroy(
grpc_channel_stack_builder_iterator *iterator);
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index af06ca802e..8285226fc4 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -36,7 +36,57 @@ typedef struct connected_channel_channel_data {
grpc_transport *transport;
} channel_data;
-typedef struct connected_channel_call_data { void *unused; } call_data;
+typedef struct {
+ grpc_closure closure;
+ grpc_closure *original_closure;
+ grpc_call_combiner *call_combiner;
+ const char *reason;
+} callback_state;
+
+typedef struct connected_channel_call_data {
+ grpc_call_combiner *call_combiner;
+ // Closures used for returning results on the call combiner.
+ callback_state on_complete[6]; // Max number of pending batches.
+ callback_state recv_initial_metadata_ready;
+ callback_state recv_message_ready;
+} call_data;
+
+static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ callback_state *state = (callback_state *)arg;
+ GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner,
+ state->original_closure, GRPC_ERROR_REF(error),
+ state->reason);
+}
+
+static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ run_in_call_combiner(exec_ctx, arg, error);
+ gpr_free(arg);
+}
+
+static void intercept_callback(call_data *calld, callback_state *state,
+ bool free_when_done, const char *reason,
+ grpc_closure **original_closure) {
+ state->original_closure = *original_closure;
+ state->call_combiner = calld->call_combiner;
+ state->reason = reason;
+ *original_closure = GRPC_CLOSURE_INIT(
+ &state->closure,
+ free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner,
+ state, grpc_schedule_on_exec_ctx);
+}
+
+static callback_state *get_state_for_batch(
+ call_data *calld, grpc_transport_stream_op_batch *batch) {
+ if (batch->send_initial_metadata) return &calld->on_complete[0];
+ if (batch->send_message) return &calld->on_complete[1];
+ if (batch->send_trailing_metadata) return &calld->on_complete[2];
+ if (batch->recv_initial_metadata) return &calld->on_complete[3];
+ if (batch->recv_message) return &calld->on_complete[4];
+ if (batch->recv_trailing_metadata) return &calld->on_complete[5];
+ GPR_UNREACHABLE_CODE(return NULL);
+}
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -49,13 +99,38 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
into transport stream operations */
static void con_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
+ if (batch->recv_initial_metadata) {
+ callback_state *state = &calld->recv_initial_metadata_ready;
+ intercept_callback(
+ calld, state, false, "recv_initial_metadata_ready",
+ &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
+ }
+ if (batch->recv_message) {
+ callback_state *state = &calld->recv_message_ready;
+ intercept_callback(calld, state, false, "recv_message_ready",
+ &batch->payload->recv_message.recv_message_ready);
+ }
+ if (batch->cancel_stream) {
+ // There can be more than one cancellation batch in flight at any
+ // given time, so we can't just pick out a fixed index into
+ // calld->on_complete like we can for the other ops. However,
+ // cancellation isn't in the fast path, so we just allocate a new
+ // closure for each one.
+ callback_state *state = (callback_state *)gpr_malloc(sizeof(*state));
+ intercept_callback(calld, state, true, "on_complete (cancel_stream)",
+ &batch->on_complete);
+ } else {
+ callback_state *state = get_state_for_batch(calld, batch);
+ intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
+ }
grpc_transport_perform_stream_op(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ batch);
+ GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ "passed batch to transport");
}
static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -71,6 +146,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
+ calld->call_combiner = args->call_combiner;
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data, args->arena);
@@ -118,11 +194,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
}
-static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_transport_get_peer(exec_ctx, chand->transport);
-}
-
/* No-op. */
static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -138,7 +209,6 @@ const grpc_channel_filter grpc_connected_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- con_get_peer,
con_get_channel_info,
"connected",
};
diff --git a/src/core/lib/debug/stats_data.c b/src/core/lib/debug/stats_data.c
index 310eb40044..81ef0d28ee 100644
--- a/src/core/lib/debug/stats_data.c
+++ b/src/core/lib/debug/stats_data.c
@@ -60,70 +60,71 @@ const char *grpc_stats_histogram_name[GRPC_STATS_HISTOGRAM_COUNT] = {
"tcp_write_iov_size", "tcp_read_size", "tcp_read_offer",
"tcp_read_iov_size", "http2_send_message_size",
};
-const int grpc_stats_table_0[64] = {
- 0, 1, 2, 3, 4, 5, 7, 9, 11, 14,
- 17, 21, 26, 32, 39, 47, 57, 69, 83, 100,
- 120, 144, 173, 207, 248, 297, 356, 426, 510, 610,
- 730, 873, 1044, 1248, 1492, 1784, 2133, 2550, 3048, 3643,
- 4354, 5204, 6219, 7432, 8882, 10615, 12685, 15159, 18115, 21648,
- 25870, 30915, 36944, 44148, 52757, 63044, 75337, 90027, 107581, 128558,
- 153625, 183579, 219373, 262144};
+const int grpc_stats_table_0[65] = {
+ 0, 1, 2, 3, 4, 5, 7, 9, 11, 14,
+ 17, 21, 26, 32, 39, 47, 57, 68, 82, 98,
+ 117, 140, 167, 199, 238, 284, 339, 404, 482, 575,
+ 685, 816, 972, 1158, 1380, 1644, 1959, 2334, 2780, 3312,
+ 3945, 4699, 5597, 6667, 7941, 9459, 11267, 13420, 15984, 19038,
+ 22676, 27009, 32169, 38315, 45635, 54353, 64737, 77104, 91834, 109378,
+ 130273, 155159, 184799, 220100, 262144};
const uint8_t grpc_stats_table_1[124] = {
0, 0, 0, 1, 1, 1, 2, 2, 3, 3, 3, 4, 4, 5, 5, 6, 6, 6,
- 7, 7, 7, 8, 9, 9, 10, 10, 10, 11, 11, 12, 12, 13, 13, 14, 14, 14,
- 15, 15, 16, 17, 17, 18, 18, 18, 19, 19, 20, 20, 21, 21, 22, 22, 23, 23,
- 24, 24, 25, 25, 26, 26, 26, 27, 28, 28, 29, 29, 30, 30, 30, 31, 32, 32,
- 33, 33, 33, 34, 34, 35, 35, 36, 36, 37, 37, 38, 38, 39, 39, 40, 40, 41,
- 41, 42, 42, 43, 43, 44, 44, 45, 45, 45, 46, 46, 47, 48, 48, 49, 49, 49,
- 50, 50, 51, 52, 52, 52, 53, 53, 54, 54, 55, 55, 56, 56, 57, 57};
-const int grpc_stats_table_2[128] = {
+ 7, 7, 7, 8, 9, 9, 10, 10, 10, 11, 11, 12, 12, 13, 13, 14, 14, 15,
+ 15, 16, 16, 17, 17, 18, 18, 19, 19, 20, 20, 21, 21, 22, 22, 22, 23, 24,
+ 24, 25, 25, 26, 26, 26, 27, 27, 28, 29, 29, 30, 30, 30, 31, 31, 32, 33,
+ 33, 34, 34, 34, 35, 35, 36, 37, 37, 37, 38, 38, 39, 39, 40, 40, 41, 41,
+ 42, 42, 43, 43, 44, 44, 45, 45, 46, 46, 47, 47, 48, 48, 49, 49, 50, 50,
+ 51, 51, 52, 52, 53, 53, 54, 54, 55, 55, 56, 56, 57, 57, 58, 58};
+const int grpc_stats_table_2[129] = {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14,
15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 30,
- 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 61,
- 64, 67, 70, 73, 76, 79, 82, 85, 88, 91, 95, 99, 103, 107, 111,
- 115, 119, 123, 128, 133, 138, 143, 148, 153, 159, 165, 171, 177, 183, 189,
- 196, 203, 210, 217, 224, 232, 240, 248, 256, 265, 274, 283, 293, 303, 313,
- 323, 334, 345, 356, 368, 380, 392, 405, 418, 432, 446, 460, 475, 490, 506,
- 522, 539, 556, 574, 592, 611, 630, 650, 670, 691, 713, 735, 758, 782, 806,
- 831, 857, 883, 910, 938, 966, 995, 1024};
+ 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60,
+ 63, 66, 69, 72, 75, 78, 81, 84, 87, 90, 94, 98, 102, 106, 110,
+ 114, 118, 122, 126, 131, 136, 141, 146, 151, 156, 162, 168, 174, 180, 186,
+ 192, 199, 206, 213, 220, 228, 236, 244, 252, 260, 269, 278, 287, 297, 307,
+ 317, 327, 338, 349, 360, 372, 384, 396, 409, 422, 436, 450, 464, 479, 494,
+ 510, 526, 543, 560, 578, 596, 615, 634, 654, 674, 695, 717, 739, 762, 785,
+ 809, 834, 859, 885, 912, 939, 967, 996, 1024};
const uint8_t grpc_stats_table_3[166] = {
0, 0, 0, 1, 1, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7,
- 8, 8, 9, 9, 10, 10, 11, 11, 12, 12, 13, 13, 14, 14, 15, 15, 15, 16, 16,
- 16, 17, 18, 18, 19, 20, 20, 21, 22, 22, 23, 24, 24, 25, 26, 26, 27, 27, 28,
- 28, 29, 29, 30, 30, 31, 31, 32, 32, 33, 33, 34, 34, 34, 35, 36, 37, 38, 38,
- 39, 40, 41, 41, 42, 43, 43, 44, 45, 45, 46, 46, 47, 48, 48, 49, 49, 50, 50,
- 51, 51, 52, 52, 53, 53, 54, 54, 55, 56, 57, 58, 59, 60, 60, 61, 62, 63, 63,
- 64, 65, 65, 66, 67, 67, 68, 69, 69, 70, 70, 71, 72, 72, 73, 73, 74, 74, 75,
- 75, 76, 77, 78, 79, 80, 80, 81, 82, 83, 84, 85, 85, 86, 87, 88, 88, 89, 90,
- 90, 91, 92, 92, 93, 93, 94, 95, 95, 96, 96, 97, 97, 98};
-const int grpc_stats_table_4[64] = {
- 0, 1, 2, 3, 4, 6, 8, 11,
- 15, 20, 26, 34, 44, 57, 74, 96,
- 124, 160, 206, 265, 341, 439, 565, 727,
- 935, 1202, 1546, 1988, 2556, 3286, 4225, 5432,
- 6983, 8977, 11540, 14834, 19069, 24513, 31510, 40505,
- 52067, 66929, 86033, 110590, 142157, 182734, 234893, 301940,
- 388125, 498910, 641316, 824370, 1059674, 1362141, 1750943, 2250722,
- 2893155, 3718960, 4780478, 6144988, 7898976, 10153611, 13051794, 16777216};
+ 8, 8, 9, 9, 10, 10, 11, 11, 12, 12, 13, 13, 14, 14, 15, 15, 16, 16, 16,
+ 17, 17, 18, 19, 19, 20, 21, 21, 22, 23, 23, 24, 25, 25, 26, 26, 27, 27, 28,
+ 28, 29, 29, 30, 30, 31, 31, 32, 32, 33, 33, 34, 34, 35, 36, 36, 37, 38, 39,
+ 40, 40, 41, 42, 42, 43, 44, 44, 45, 46, 46, 47, 48, 48, 49, 49, 50, 50, 51,
+ 51, 52, 52, 53, 53, 54, 54, 55, 56, 57, 58, 59, 59, 60, 61, 62, 63, 63, 64,
+ 65, 65, 66, 67, 67, 68, 69, 69, 70, 71, 71, 72, 72, 73, 73, 74, 75, 75, 76,
+ 76, 77, 78, 79, 79, 80, 81, 82, 83, 84, 85, 85, 86, 87, 88, 88, 89, 90, 90,
+ 91, 92, 92, 93, 94, 94, 95, 95, 96, 97, 97, 98, 98, 99};
+const int grpc_stats_table_4[65] = {
+ 0, 1, 2, 3, 4, 6, 8, 11,
+ 15, 20, 26, 34, 44, 57, 73, 94,
+ 121, 155, 199, 255, 327, 419, 537, 688,
+ 881, 1128, 1444, 1848, 2365, 3026, 3872, 4954,
+ 6338, 8108, 10373, 13270, 16976, 21717, 27782, 35541,
+ 45467, 58165, 74409, 95189, 121772, 155778, 199281, 254933,
+ 326126, 417200, 533707, 682750, 873414, 1117323, 1429345, 1828502,
+ 2339127, 2992348, 3827987, 4896985, 6264509, 8013925, 10251880, 13114801,
+ 16777216};
const uint8_t grpc_stats_table_5[87] = {
- 0, 0, 1, 1, 2, 3, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 10,
- 11, 12, 12, 13, 14, 14, 15, 16, 17, 17, 18, 19, 19, 20, 21, 21, 22, 23,
- 24, 24, 25, 25, 26, 27, 28, 28, 29, 30, 30, 31, 32, 33, 33, 34, 35, 35,
- 36, 36, 37, 38, 39, 39, 40, 41, 41, 42, 43, 44, 44, 45, 46, 46, 47, 47,
- 48, 49, 50, 50, 51, 52, 52, 53, 54, 55, 55, 56, 57, 57, 58};
-const int grpc_stats_table_6[64] = {
- 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13,
- 15, 17, 19, 21, 23, 25, 28, 31, 34, 37, 41, 45, 49,
- 54, 59, 64, 70, 76, 83, 90, 98, 106, 115, 125, 136, 147,
- 159, 172, 186, 201, 218, 236, 255, 276, 299, 323, 349, 377, 408,
- 441, 477, 515, 556, 601, 649, 701, 757, 817, 881, 950, 1024};
-const uint8_t grpc_stats_table_7[104] = {
- 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6,
- 7, 7, 7, 8, 8, 8, 9, 9, 10, 11, 11, 12, 12, 13, 13, 14, 14, 14,
- 15, 15, 16, 16, 16, 17, 18, 18, 19, 20, 20, 21, 21, 22, 22, 23, 23, 24,
- 24, 24, 25, 25, 26, 27, 28, 28, 29, 29, 30, 30, 31, 31, 32, 32, 33, 33,
- 34, 34, 35, 36, 36, 37, 38, 38, 39, 39, 40, 40, 41, 41, 42, 42, 42, 43,
- 44, 45, 45, 46, 47, 47, 48, 48, 49, 49, 50, 50, 51, 51};
+ 0, 0, 1, 1, 2, 3, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 11,
+ 11, 12, 13, 13, 14, 15, 15, 16, 17, 17, 18, 19, 20, 20, 21, 22, 22, 23,
+ 24, 25, 25, 26, 27, 27, 28, 29, 29, 30, 31, 31, 32, 33, 34, 34, 35, 36,
+ 36, 37, 38, 39, 39, 40, 41, 41, 42, 43, 44, 44, 45, 45, 46, 47, 48, 48,
+ 49, 50, 51, 51, 52, 53, 53, 54, 55, 56, 56, 57, 58, 58, 59};
+const int grpc_stats_table_6[65] = {
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
+ 14, 16, 18, 20, 22, 24, 27, 30, 33, 36, 39, 43, 47,
+ 51, 56, 61, 66, 72, 78, 85, 92, 100, 109, 118, 128, 139,
+ 151, 164, 178, 193, 209, 226, 244, 264, 285, 308, 333, 359, 387,
+ 418, 451, 486, 524, 565, 609, 656, 707, 762, 821, 884, 952, 1024};
+const uint8_t grpc_stats_table_7[102] = {
+ 0, 0, 0, 1, 1, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6,
+ 6, 7, 7, 7, 8, 8, 9, 9, 10, 11, 11, 12, 12, 13, 13, 14, 14,
+ 14, 15, 15, 16, 16, 17, 17, 18, 19, 19, 20, 20, 21, 21, 22, 22, 23,
+ 23, 24, 24, 24, 25, 26, 27, 27, 28, 28, 29, 29, 30, 30, 31, 31, 32,
+ 32, 33, 33, 34, 35, 35, 36, 37, 37, 38, 38, 39, 39, 40, 40, 41, 41,
+ 42, 42, 43, 44, 44, 45, 46, 46, 47, 48, 48, 49, 49, 50, 50, 51, 51};
void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int value) {
value = GPR_CLAMP(value, 0, 262144);
if (value < 6) {
@@ -136,7 +137,7 @@ void grpc_stats_inc_call_initial_size(grpc_exec_ctx *exec_ctx, int value) {
uint64_t uint;
} _val, _bkt;
_val.dbl = value;
- if (_val.uint < 4650529565213458432ull) {
+ if (_val.uint < 4651092515166879744ull) {
int bucket =
grpc_stats_table_1[((_val.uint - 4618441417868443648ull) >> 49)] + 6;
_bkt.dbl = grpc_stats_table_0[bucket];
@@ -161,7 +162,7 @@ void grpc_stats_inc_poll_events_returned(grpc_exec_ctx *exec_ctx, int value) {
uint64_t uint;
} _val, _bkt;
_val.dbl = value;
- if (_val.uint < 4642648265865560064ull) {
+ if (_val.uint < 4642789003353915392ull) {
int bucket =
grpc_stats_table_3[((_val.uint - 4628855992006737920ull) >> 47)] + 29;
_bkt.dbl = grpc_stats_table_2[bucket];
@@ -187,7 +188,7 @@ void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int value) {
uint64_t uint;
} _val, _bkt;
_val.dbl = value;
- if (_val.uint < 4682617712558473216ull) {
+ if (_val.uint < 4683743612465315840ull) {
int bucket =
grpc_stats_table_5[((_val.uint - 4617315517961601024ull) >> 50)] + 5;
_bkt.dbl = grpc_stats_table_4[bucket];
@@ -202,7 +203,7 @@ void grpc_stats_inc_tcp_write_size(grpc_exec_ctx *exec_ctx, int value) {
}
void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int value) {
value = GPR_CLAMP(value, 0, 1024);
- if (value < 12) {
+ if (value < 13) {
GRPC_STATS_INC_HISTOGRAM((exec_ctx),
GRPC_STATS_HISTOGRAM_TCP_WRITE_IOV_SIZE, value);
return;
@@ -212,9 +213,9 @@ void grpc_stats_inc_tcp_write_iov_size(grpc_exec_ctx *exec_ctx, int value) {
uint64_t uint;
} _val, _bkt;
_val.dbl = value;
- if (_val.uint < 4637300241308057600ull) {
+ if (_val.uint < 4637863191261478912ull) {
int bucket =
- grpc_stats_table_7[((_val.uint - 4622945017495814144ull) >> 48)] + 12;
+ grpc_stats_table_7[((_val.uint - 4623507967449235456ull) >> 48)] + 13;
_bkt.dbl = grpc_stats_table_6[bucket];
bucket -= (_val.uint < _bkt.uint);
GRPC_STATS_INC_HISTOGRAM((exec_ctx),
@@ -237,7 +238,7 @@ void grpc_stats_inc_tcp_read_size(grpc_exec_ctx *exec_ctx, int value) {
uint64_t uint;
} _val, _bkt;
_val.dbl = value;
- if (_val.uint < 4682617712558473216ull) {
+ if (_val.uint < 4683743612465315840ull) {
int bucket =
grpc_stats_table_5[((_val.uint - 4617315517961601024ull) >> 50)] + 5;
_bkt.dbl = grpc_stats_table_4[bucket];
@@ -262,7 +263,7 @@ void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) {
uint64_t uint;
} _val, _bkt;
_val.dbl = value;
- if (_val.uint < 4682617712558473216ull) {
+ if (_val.uint < 4683743612465315840ull) {
int bucket =
grpc_stats_table_5[((_val.uint - 4617315517961601024ull) >> 50)] + 5;
_bkt.dbl = grpc_stats_table_4[bucket];
@@ -277,7 +278,7 @@ void grpc_stats_inc_tcp_read_offer(grpc_exec_ctx *exec_ctx, int value) {
}
void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) {
value = GPR_CLAMP(value, 0, 1024);
- if (value < 12) {
+ if (value < 13) {
GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
value);
return;
@@ -287,9 +288,9 @@ void grpc_stats_inc_tcp_read_iov_size(grpc_exec_ctx *exec_ctx, int value) {
uint64_t uint;
} _val, _bkt;
_val.dbl = value;
- if (_val.uint < 4637300241308057600ull) {
+ if (_val.uint < 4637863191261478912ull) {
int bucket =
- grpc_stats_table_7[((_val.uint - 4622945017495814144ull) >> 48)] + 12;
+ grpc_stats_table_7[((_val.uint - 4623507967449235456ull) >> 48)] + 13;
_bkt.dbl = grpc_stats_table_6[bucket];
bucket -= (_val.uint < _bkt.uint);
GRPC_STATS_INC_HISTOGRAM((exec_ctx), GRPC_STATS_HISTOGRAM_TCP_READ_IOV_SIZE,
@@ -313,7 +314,7 @@ void grpc_stats_inc_http2_send_message_size(grpc_exec_ctx *exec_ctx,
uint64_t uint;
} _val, _bkt;
_val.dbl = value;
- if (_val.uint < 4682617712558473216ull) {
+ if (_val.uint < 4683743612465315840ull) {
int bucket =
grpc_stats_table_5[((_val.uint - 4617315517961601024ull) >> 50)] + 5;
_bkt.dbl = grpc_stats_table_4[bucket];
diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.c
new file mode 100644
index 0000000000..48d8eaec18
--- /dev/null
+++ b/src/core/lib/iomgr/call_combiner.c
@@ -0,0 +1,202 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/call_combiner.h"
+
+#include <grpc/support/log.h>
+
+grpc_tracer_flag grpc_call_combiner_trace =
+ GRPC_TRACER_INITIALIZER(false, "call_combiner");
+
+static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
+ if (cancel_state & 1) {
+ return (grpc_error*)(cancel_state & ~(gpr_atm)1);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static gpr_atm encode_cancel_state_error(grpc_error* error) {
+ return (gpr_atm)1 | (gpr_atm)error;
+}
+
+void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
+ gpr_mpscq_init(&call_combiner->queue);
+}
+
+void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
+ gpr_mpscq_destroy(&call_combiner->queue);
+ GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
+}
+
+#ifndef NDEBUG
+#define DEBUG_ARGS , const char *file, int line
+#define DEBUG_FMT_STR "%s:%d: "
+#define DEBUG_FMT_ARGS , file, line
+#else
+#define DEBUG_ARGS
+#define DEBUG_FMT_STR
+#define DEBUG_FMT_ARGS
+#endif
+
+void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure,
+ grpc_error* error DEBUG_ARGS,
+ const char* reason) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
+ "%s] error=%s",
+ call_combiner, closure DEBUG_FMT_ARGS, reason,
+ grpc_error_string(error));
+ }
+ size_t prev_size =
+ (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1);
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
+ prev_size + 1);
+ }
+ if (prev_size == 0) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
+ }
+ // Queue was empty, so execute this closure immediately.
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
+ } else {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_INFO, " QUEUING");
+ }
+ // Queue was not empty, so add closure to queue.
+ closure->error_data.error = error;
+ gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure);
+ }
+}
+
+void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner DEBUG_ARGS,
+ const char* reason) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
+ call_combiner DEBUG_FMT_ARGS, reason);
+ }
+ size_t prev_size =
+ (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1);
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
+ prev_size - 1);
+ }
+ GPR_ASSERT(prev_size >= 1);
+ if (prev_size > 1) {
+ while (true) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " checking queue");
+ }
+ bool empty;
+ grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end(
+ &call_combiner->queue, &empty);
+ if (closure == NULL) {
+ // This can happen either due to a race condition within the mpscq
+ // code or because of a race with grpc_call_combiner_start().
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " queue returned no result; checking again");
+ }
+ continue;
+ }
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s",
+ closure, grpc_error_string(closure->error_data.error));
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error);
+ break;
+ }
+ } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " queue empty");
+ }
+}
+
+void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure) {
+ while (true) {
+ // Decode original state.
+ gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
+ grpc_error* original_error = decode_cancel_state_error(original_state);
+ // If error is set, invoke the cancellation closure immediately.
+ // Otherwise, store the new closure.
+ if (original_error != GRPC_ERROR_NONE) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "call_combiner=%p: scheduling notify_on_cancel callback=%p "
+ "for pre-existing cancellation",
+ call_combiner, closure);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error));
+ break;
+ } else {
+ if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
+ (gpr_atm)closure)) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, "call_combiner=%p: setting notify_on_cancel=%p",
+ call_combiner, closure);
+ }
+ // If we replaced an earlier closure, invoke the original
+ // closure with GRPC_ERROR_NONE. This allows callers to clean
+ // up any resources they may be holding for the callback.
+ if (original_state != 0) {
+ closure = (grpc_closure*)original_state;
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "call_combiner=%p: scheduling old cancel callback=%p",
+ call_combiner, closure);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
+ }
+ break;
+ }
+ }
+ // cas failed, try again.
+ }
+}
+
+void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_error* error) {
+ while (true) {
+ gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
+ grpc_error* original_error = decode_cancel_state_error(original_state);
+ if (original_error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(error);
+ break;
+ }
+ if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
+ encode_cancel_state_error(error))) {
+ if (original_state != 0) {
+ grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "call_combiner=%p: scheduling notify_on_cancel callback=%p",
+ call_combiner, notify_on_cancel);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error));
+ }
+ break;
+ }
+ // cas failed, try again.
+ }
+}
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
new file mode 100644
index 0000000000..5cfb3f0c07
--- /dev/null
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -0,0 +1,121 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
+#define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
+
+#include <stddef.h>
+
+#include <grpc/support/atm.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/mpscq.h"
+
+// A simple, lock-free mechanism for serializing activity related to a
+// single call. This is similar to a combiner but is more lightweight.
+//
+// It requires the callback (or, in the common case where the callback
+// actually kicks off a chain of callbacks, the last callback in that
+// chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
+// when it is done with the action that was kicked off by the original
+// callback.
+
+extern grpc_tracer_flag grpc_call_combiner_trace;
+
+typedef struct {
+ gpr_atm size; // size_t, num closures in queue or currently executing
+ gpr_mpscq queue;
+ // Either 0 (if not cancelled and no cancellation closure set),
+ // a grpc_closure* (if the lowest bit is 0),
+ // or a grpc_error* (if the lowest bit is 1).
+ gpr_atm cancel_state;
+} grpc_call_combiner;
+
+// Assumes memory was initialized to zero.
+void grpc_call_combiner_init(grpc_call_combiner* call_combiner);
+
+void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner);
+
+#ifndef NDEBUG
+#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \
+ reason) \
+ grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
+ __FILE__, __LINE__, (reason))
+#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \
+ grpc_call_combiner_stop((exec_ctx), (call_combiner), __FILE__, __LINE__, \
+ (reason))
+/// Starts processing \a closure on \a call_combiner.
+void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure, grpc_error* error,
+ const char* file, int line, const char* reason);
+/// Yields the call combiner to the next closure in the queue, if any.
+void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ const char* file, int line, const char* reason);
+#else
+#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \
+ reason) \
+ grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
+ (reason))
+#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \
+ grpc_call_combiner_stop((exec_ctx), (call_combiner), (reason))
+/// Starts processing \a closure on \a call_combiner.
+void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure, grpc_error* error,
+ const char* reason);
+/// Yields the call combiner to the next closure in the queue, if any.
+void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ const char* reason);
+#endif
+
+/// Registers \a closure to be invoked by \a call_combiner when
+/// grpc_call_combiner_cancel() is called.
+///
+/// Once a closure is registered, it will always be scheduled exactly
+/// once; this allows the closure to hold references that will be freed
+/// regardless of whether or not the call was cancelled. If a cancellation
+/// does occur, the closure will be scheduled with the cancellation error;
+/// otherwise, it will be scheduled with GRPC_ERROR_NONE.
+///
+/// The closure will be scheduled in the following cases:
+/// - If grpc_call_combiner_cancel() was called prior to registering the
+/// closure, it will be scheduled immediately with the cancelation error.
+/// - If grpc_call_combiner_cancel() is called after registering the
+/// closure, the closure will be scheduled with the cancellation error.
+/// - If grpc_call_combiner_set_notify_on_cancel() is called again to
+/// register a new cancellation closure, the previous cancellation
+/// closure will be scheduled with GRPC_ERROR_NONE.
+///
+/// If \a closure is NULL, then no closure will be invoked on
+/// cancellation; this effectively unregisters the previously set closure.
+/// However, most filters will not need to explicitly unregister their
+/// callbacks, as this is done automatically when the call is destroyed.
+void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure);
+
+/// Indicates that the call has been cancelled.
+void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_error* error);
+
+#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 6a3c2654a8..ef5dfd4429 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -702,22 +702,30 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_mu_unlock(&pollset->mu);
goto retry_lock_neighbourhood;
}
- pollset->seen_inactive = false;
- if (neighbourhood->active_root == NULL) {
- neighbourhood->active_root = pollset->next = pollset->prev = pollset;
- /* TODO: sreek. Why would this worker state be other than UNKICKED
- * here ? (since the worker isn't added to the pollset yet, there is no
- * way it can be "found" by other threads to get kicked). */
-
- /* If there is no designated poller, make this the designated poller */
- if (worker->kick_state == UNKICKED &&
- gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
- SET_KICK_STATE(worker, DESIGNATED_POLLER);
+
+ /* In the brief time we released the pollset locks above, the worker MAY
+ have been kicked. In this case, the worker should get out of this
+ pollset ASAP and hence this should neither add the pollset to
+ neighbourhood nor mark the pollset as active.
+
+ On a side note, the only way a worker's kick state could have changed
+ at this point is if it were "kicked specifically". Since the worker has
+ not added itself to the pollset yet (by calling worker_insert()), it is
+ not visible in the "kick any" path yet */
+ if (worker->kick_state == UNKICKED) {
+ pollset->seen_inactive = false;
+ if (neighbourhood->active_root == NULL) {
+ neighbourhood->active_root = pollset->next = pollset->prev = pollset;
+ /* Make this the designated poller if there isn't one already */
+ if (worker->kick_state == UNKICKED &&
+ gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
+ SET_KICK_STATE(worker, DESIGNATED_POLLER);
+ }
+ } else {
+ pollset->next = neighbourhood->active_root;
+ pollset->prev = pollset->next->prev;
+ pollset->next->prev = pollset->prev->next = pollset;
}
- } else {
- pollset->next = neighbourhood->active_root;
- pollset->prev = pollset->next->prev;
- pollset->next->prev = pollset->prev->next = pollset;
}
}
if (is_reassigning) {
@@ -1006,6 +1014,7 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
gpr_log(GPR_ERROR, "%s", tmp);
gpr_free(tmp);
}
+
if (specific_worker == NULL) {
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
grpc_pollset_worker *root_worker = pollset->root_worker;
@@ -1090,8 +1099,11 @@ static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
goto done;
}
- } else if (specific_worker->kick_state == KICKED) {
- GRPC_STATS_INC_POLLSET_KICKED_AGAIN(exec_ctx);
+
+ GPR_UNREACHABLE_CODE(goto done);
+ }
+
+ if (specific_worker->kick_state == KICKED) {
if (GRPC_TRACER_ON(grpc_polling_trace)) {
gpr_log(GPR_ERROR, " .. specific worker already kicked");
}
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
deleted file mode 100644
index ee3250297f..0000000000
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ /dev/null
@@ -1,1962 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/lib/iomgr/port.h"
-
-/* This polling engine is only relevant on linux kernels supporting epoll() */
-#ifdef GRPC_LINUX_EPOLL
-
-#include "src/core/lib/iomgr/ev_epoll_limited_pollers_linux.h"
-
-#include <assert.h>
-#include <errno.h>
-#include <limits.h>
-#include <poll.h>
-#include <pthread.h>
-#include <signal.h>
-#include <string.h>
-#include <sys/epoll.h>
-#include <sys/socket.h>
-#include <unistd.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/tls.h>
-#include <grpc/support/useful.h>
-
-#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/debug/trace.h"
-#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/iomgr/iomgr_internal.h"
-#include "src/core/lib/iomgr/lockfree_event.h"
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/support/block_annotate.h"
-#include "src/core/lib/support/env.h"
-
-#define GRPC_POLLING_TRACE(fmt, ...) \
- if (GRPC_TRACER_ON(grpc_polling_trace)) { \
- gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
- }
-
-#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
-
-/* The maximum number of polling threads per polling island. By default no
- limit */
-static int g_max_pollers_per_pi = INT_MAX;
-
-static int grpc_wakeup_signal = -1;
-static bool is_grpc_wakeup_signal_initialized = false;
-
-/* Implements the function defined in grpc_posix.h. This function might be
- * called before even calling grpc_init() to set either a different signal to
- * use. If signum == -1, then the use of signals is disabled */
-static void grpc_use_signal(int signum) {
- grpc_wakeup_signal = signum;
- is_grpc_wakeup_signal_initialized = true;
-
- if (grpc_wakeup_signal < 0) {
- gpr_log(GPR_INFO,
- "Use of signals is disabled. Epoll engine will not be used");
- } else {
- gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
- grpc_wakeup_signal);
- }
-}
-
-struct polling_island;
-
-typedef enum {
- POLL_OBJ_FD,
- POLL_OBJ_POLLSET,
- POLL_OBJ_POLLSET_SET
-} poll_obj_type;
-
-typedef struct poll_obj {
-#ifndef NDEBUG
- poll_obj_type obj_type;
-#endif
- gpr_mu mu;
- struct polling_island *pi;
-} poll_obj;
-
-static const char *poll_obj_string(poll_obj_type po_type) {
- switch (po_type) {
- case POLL_OBJ_FD:
- return "fd";
- case POLL_OBJ_POLLSET:
- return "pollset";
- case POLL_OBJ_POLLSET_SET:
- return "pollset_set";
- }
-
- GPR_UNREACHABLE_CODE(return "UNKNOWN");
-}
-
-/*******************************************************************************
- * Fd Declarations
- */
-
-#define FD_FROM_PO(po) ((grpc_fd *)(po))
-
-struct grpc_fd {
- poll_obj po;
-
- int fd;
- /* refst format:
- bit 0 : 1=Active / 0=Orphaned
- bits 1-n : refcount
- Ref/Unref by two to avoid altering the orphaned bit */
- gpr_atm refst;
-
- /* The fd is either closed or we relinquished control of it. In either
- cases, this indicates that the 'fd' on this structure is no longer
- valid */
- bool orphaned;
-
- gpr_atm read_closure;
- gpr_atm write_closure;
-
- struct grpc_fd *freelist_next;
- grpc_closure *on_done_closure;
-
- /* The pollset that last noticed that the fd is readable. The actual type
- * stored in this is (grpc_pollset *) */
- gpr_atm read_notifier_pollset;
-
- grpc_iomgr_object iomgr_object;
-};
-
-/* Reference counting for fds */
-#ifndef NDEBUG
-static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
-static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
- int line);
-#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
-#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
-#else
-static void fd_ref(grpc_fd *fd);
-static void fd_unref(grpc_fd *fd);
-#define GRPC_FD_REF(fd, reason) fd_ref(fd)
-#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
-#endif
-
-static void fd_global_init(void);
-static void fd_global_shutdown(void);
-
-/*******************************************************************************
- * Polling island Declarations
- */
-
-#ifndef NDEBUG
-
-#define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
-#define PI_UNREF(exec_ctx, p, r) \
- pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
-
-#else
-
-#define PI_ADD_REF(p, r) pi_add_ref((p))
-#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p))
-
-#endif
-
-typedef struct worker_node {
- struct worker_node *next;
- struct worker_node *prev;
-} worker_node;
-
-/* This is also used as grpc_workqueue (by directly casing it) */
-typedef struct polling_island {
- gpr_mu mu;
- /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
- the refcount.
- Once the ref count becomes zero, this structure is destroyed which means
- we should ensure that there is never a scenario where a PI_ADD_REF() is
- racing with a PI_UNREF() that just made the ref_count zero. */
- gpr_atm ref_count;
-
- /* Pointer to the polling_island this merged into.
- * merged_to value is only set once in polling_island's lifetime (and that too
- * only if the island is merged with another island). Because of this, we can
- * use gpr_atm type here so that we can do atomic access on this and reduce
- * lock contention on 'mu' mutex.
- *
- * Note that if this field is not NULL (i.e not 0), all the remaining fields
- * (except mu and ref_count) are invalid and must be ignored. */
- gpr_atm merged_to;
-
- /* Number of threads currently polling on this island */
- gpr_atm poller_count;
-
- /* The list of workers waiting to do polling on this polling island */
- gpr_mu worker_list_mu;
- worker_node worker_list_head;
-
- /* The fd of the underlying epoll set */
- int epoll_fd;
-
- /* The file descriptors in the epoll set */
- size_t fd_cnt;
- size_t fd_capacity;
- grpc_fd **fds;
-} polling_island;
-
-/*******************************************************************************
- * Pollset Declarations
- */
-#define WORKER_FROM_WORKER_LIST_NODE(p) \
- (struct grpc_pollset_worker *)(((char *)(p)) - \
- offsetof(grpc_pollset_worker, pi_list_link))
-struct grpc_pollset_worker {
- /* Thread id of this worker */
- pthread_t pt_id;
-
- /* Used to prevent a worker from getting kicked multiple times */
- gpr_atm is_kicked;
-
- struct grpc_pollset_worker *next;
- struct grpc_pollset_worker *prev;
-
- /* Indicates if it is this worker's turn to do epoll */
- gpr_atm is_polling_turn;
-
- /* Node in the polling island's worker list. */
- worker_node pi_list_link;
-};
-
-struct grpc_pollset {
- poll_obj po;
-
- grpc_pollset_worker root_worker;
- bool kicked_without_pollers;
-
- bool shutting_down; /* Is the pollset shutting down ? */
- bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
- grpc_closure *shutdown_done; /* Called after after shutdown is complete */
-};
-
-/*******************************************************************************
- * Pollset-set Declarations
- */
-struct grpc_pollset_set {
- poll_obj po;
-};
-
-/*******************************************************************************
- * Common helpers
- */
-
-static bool append_error(grpc_error **composite, grpc_error *error,
- const char *desc) {
- if (error == GRPC_ERROR_NONE) return true;
- if (*composite == GRPC_ERROR_NONE) {
- *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
- }
- *composite = grpc_error_add_child(*composite, error);
- return false;
-}
-
-/*******************************************************************************
- * Polling island Definitions
- */
-
-/* The wakeup fd that is used to wake up all threads in a Polling island. This
- is useful in the polling island merge operation where we need to wakeup all
- the threads currently polling the smaller polling island (so that they can
- start polling the new/merged polling island)
-
- NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
- threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
-static grpc_wakeup_fd polling_island_wakeup_fd;
-
-/* The polling island being polled right now.
- See comments in workqueue_maybe_wakeup for why this is tracked. */
-static __thread polling_island *g_current_thread_polling_island;
-
-/* Forward declaration */
-static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
-
-#ifdef GRPC_TSAN
-/* Currently TSAN may incorrectly flag data races between epoll_ctl and
- epoll_wait for any grpc_fd structs that are added to the epoll set via
- epoll_ctl and are returned (within a very short window) via epoll_wait().
-
- To work-around this race, we establish a happens-before relation between
- the code just-before epoll_ctl() and the code after epoll_wait() by using
- this atomic */
-gpr_atm g_epoll_sync;
-#endif /* defined(GRPC_TSAN) */
-
-static void pi_add_ref(polling_island *pi);
-static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
-
-#ifndef NDEBUG
-static void pi_add_ref_dbg(polling_island *pi, const char *reason,
- const char *file, int line) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count);
- gpr_log(GPR_DEBUG, "Add ref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
- " (%s) - (%s, %d)",
- pi, old_cnt, old_cnt + 1, reason, file, line);
- }
- pi_add_ref(pi);
-}
-
-static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi,
- const char *reason, const char *file, int line) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count);
- gpr_log(GPR_DEBUG, "Unref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
- " (%s) - (%s, %d)",
- pi, old_cnt, (old_cnt - 1), reason, file, line);
- }
- pi_unref(exec_ctx, pi);
-}
-#endif
-
-static void pi_add_ref(polling_island *pi) {
- gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
-}
-
-static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) {
- /* If ref count went to zero, delete the polling island.
- Note that this deletion not be done under a lock. Once the ref count goes
- to zero, we are guaranteed that no one else holds a reference to the
- polling island (and that there is no racing pi_add_ref() call either).
-
- Also, if we are deleting the polling island and the merged_to field is
- non-empty, we should remove a ref to the merged_to polling island
- */
- if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
- polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- polling_island_delete(exec_ctx, pi);
- if (next != NULL) {
- PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */
- }
- }
-}
-
-static void worker_node_init(worker_node *node) {
- node->next = node->prev = node;
-}
-
-/* Not thread safe. Do under a list-level lock */
-static void push_back_worker_node(worker_node *head, worker_node *node) {
- node->next = head;
- node->prev = head->prev;
- head->prev->next = node;
- head->prev = node;
-}
-
-/* Not thread safe. Do under a list-level lock */
-static void remove_worker_node(worker_node *node) {
- node->next->prev = node->prev;
- node->prev->next = node->next;
- /* If node's next and prev point to itself, the node is considered detached
- * from the list*/
- node->next = node->prev = node;
-}
-
-/* Not thread safe. Do under a list-level lock */
-static worker_node *pop_front_worker_node(worker_node *head) {
- worker_node *node = head->next;
- if (node != head) {
- remove_worker_node(node);
- } else {
- node = NULL;
- }
-
- return node;
-}
-
-/* Returns true if the node's next and prev are pointing to itself (which
- indicates that the node is not in the list */
-static bool is_worker_node_detached(worker_node *node) {
- return (node->next == node->prev && node->next == node);
-}
-
-/* The caller is expected to hold pi->mu lock before calling this function
- */
-static void polling_island_add_fds_locked(polling_island *pi, grpc_fd **fds,
- size_t fd_count, bool add_fd_refs,
- grpc_error **error) {
- int err;
- size_t i;
- struct epoll_event ev;
- char *err_msg;
- const char *err_desc = "polling_island_add_fds";
-
-#ifdef GRPC_TSAN
- /* See the definition of g_epoll_sync for more context */
- gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
-#endif /* defined(GRPC_TSAN) */
-
- for (i = 0; i < fd_count; i++) {
- ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = fds[i];
- err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
-
- if (err < 0) {
- if (errno != EEXIST) {
- gpr_asprintf(
- &err_msg,
- "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
- pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
-
- continue;
- }
-
- if (pi->fd_cnt == pi->fd_capacity) {
- pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
- pi->fds = gpr_realloc(pi->fds, sizeof(grpc_fd *) * pi->fd_capacity);
- }
-
- pi->fds[pi->fd_cnt++] = fds[i];
- if (add_fd_refs) {
- GRPC_FD_REF(fds[i], "polling_island");
- }
- }
-}
-
-/* The caller is expected to hold pi->mu before calling this */
-static void polling_island_add_wakeup_fd_locked(polling_island *pi,
- grpc_wakeup_fd *wakeup_fd,
- grpc_error **error) {
- struct epoll_event ev;
- int err;
- char *err_msg;
- const char *err_desc = "polling_island_add_wakeup_fd";
-
- ev.events = (uint32_t)(EPOLLIN | EPOLLET);
- ev.data.ptr = wakeup_fd;
- err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
- GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
- if (err < 0 && errno != EEXIST) {
- gpr_asprintf(&err_msg,
- "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
- "error: %d (%s)",
- pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), errno,
- strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
-}
-
-/* The caller is expected to hold pi->mu lock before calling this function */
-static void polling_island_remove_all_fds_locked(polling_island *pi,
- bool remove_fd_refs,
- grpc_error **error) {
- int err;
- size_t i;
- char *err_msg;
- const char *err_desc = "polling_island_remove_fds";
-
- for (i = 0; i < pi->fd_cnt; i++) {
- err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, NULL);
- if (err < 0 && errno != ENOENT) {
- gpr_asprintf(&err_msg,
- "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
- "error: %d (%s)",
- pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
-
- if (remove_fd_refs) {
- GRPC_FD_UNREF(pi->fds[i], "polling_island");
- }
- }
-
- pi->fd_cnt = 0;
-}
-
-/* The caller is expected to hold pi->mu lock before calling this function */
-static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd,
- bool is_fd_closed,
- grpc_error **error) {
- int err;
- size_t i;
- char *err_msg;
- const char *err_desc = "polling_island_remove_fd";
-
- /* If fd is already closed, then it would have been automatically been removed
- from the epoll set */
- if (!is_fd_closed) {
- err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
- if (err < 0 && errno != ENOENT) {
- gpr_asprintf(
- &err_msg,
- "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
- pi->epoll_fd, fd->fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
- }
-
- for (i = 0; i < pi->fd_cnt; i++) {
- if (pi->fds[i] == fd) {
- pi->fds[i] = pi->fds[--pi->fd_cnt];
- GRPC_FD_UNREF(fd, "polling_island");
- break;
- }
- }
-}
-
-/* Might return NULL in case of an error */
-static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
- grpc_fd *initial_fd,
- grpc_error **error) {
- polling_island *pi = NULL;
- const char *err_desc = "polling_island_create";
-
- *error = GRPC_ERROR_NONE;
-
- pi = gpr_malloc(sizeof(*pi));
- gpr_mu_init(&pi->mu);
- pi->fd_cnt = 0;
- pi->fd_capacity = 0;
- pi->fds = NULL;
- pi->epoll_fd = -1;
-
- gpr_atm_rel_store(&pi->ref_count, 0);
- gpr_atm_rel_store(&pi->poller_count, 0);
- gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
-
- gpr_mu_init(&pi->worker_list_mu);
- worker_node_init(&pi->worker_list_head);
-
- pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
-
- if (pi->epoll_fd < 0) {
- append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
- goto done;
- }
-
- if (initial_fd != NULL) {
- polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
- }
-
-done:
- if (*error != GRPC_ERROR_NONE) {
- polling_island_delete(exec_ctx, pi);
- pi = NULL;
- }
- return pi;
-}
-
-static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
- GPR_ASSERT(pi->fd_cnt == 0);
-
- if (pi->epoll_fd >= 0) {
- close(pi->epoll_fd);
- }
- gpr_mu_destroy(&pi->mu);
- gpr_mu_destroy(&pi->worker_list_mu);
- GPR_ASSERT(is_worker_node_detached(&pi->worker_list_head));
-
- gpr_free(pi->fds);
- gpr_free(pi);
-}
-
-/* Attempts to gets the last polling island in the linked list (liked by the
- * 'merged_to' field). Since this does not lock the polling island, there are no
- * guarantees that the island returned is the last island */
-static polling_island *polling_island_maybe_get_latest(polling_island *pi) {
- polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- while (next != NULL) {
- pi = next;
- next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- }
-
- return pi;
-}
-
-/* Gets the lock on the *latest* polling island i.e the last polling island in
- the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
- returned polling island's mu.
- Usage: To lock/unlock polling island "pi", do the following:
- polling_island *pi_latest = polling_island_lock(pi);
- ...
- ... critical section ..
- ...
- gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
-static polling_island *polling_island_lock(polling_island *pi) {
- polling_island *next = NULL;
-
- while (true) {
- next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- if (next == NULL) {
- /* Looks like 'pi' is the last node in the linked list but unless we check
- this by holding the pi->mu lock, we cannot be sure (i.e without the
- pi->mu lock, we don't prevent island merges).
- To be absolutely sure, check once more by holding the pi->mu lock */
- gpr_mu_lock(&pi->mu);
- next = (polling_island *)gpr_atm_acq_load(&pi->merged_to);
- if (next == NULL) {
- /* pi is infact the last node and we have the pi->mu lock. we're done */
- break;
- }
-
- /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
- * isn't the lock we are interested in. Continue traversing the list */
- gpr_mu_unlock(&pi->mu);
- }
-
- pi = next;
- }
-
- return pi;
-}
-
-/* Gets the lock on the *latest* polling islands in the linked lists pointed by
- *p and *q (and also updates *p and *q to point to the latest polling islands)
-
- This function is needed because calling the following block of code to obtain
- locks on polling islands (*p and *q) is prone to deadlocks.
- {
- polling_island_lock(*p, true);
- polling_island_lock(*q, true);
- }
-
- Usage/example:
- polling_island *p1;
- polling_island *p2;
- ..
- polling_island_lock_pair(&p1, &p2);
- ..
- .. Critical section with both p1 and p2 locked
- ..
- // Release locks: Always call polling_island_unlock_pair() to release locks
- polling_island_unlock_pair(p1, p2);
-*/
-static void polling_island_lock_pair(polling_island **p, polling_island **q) {
- polling_island *pi_1 = *p;
- polling_island *pi_2 = *q;
- polling_island *next_1 = NULL;
- polling_island *next_2 = NULL;
-
- /* The algorithm is simple:
- - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
- keep updating pi_1 and pi_2)
- - Then obtain locks on the islands by following a lock order rule of
- locking polling_island with lower address first
- Special case: Before obtaining the locks, check if pi_1 and pi_2 are
- pointing to the same island. If that is the case, we can just call
- polling_island_lock()
- - After obtaining both the locks, double check that the polling islands
- are still the last polling islands in their respective linked lists
- (this is because there might have been polling island merges before
- we got the lock)
- - If the polling islands are the last islands, we are done. If not,
- release the locks and continue the process from the first step */
- while (true) {
- next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
- while (next_1 != NULL) {
- pi_1 = next_1;
- next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
- }
-
- next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
- while (next_2 != NULL) {
- pi_2 = next_2;
- next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
- }
-
- if (pi_1 == pi_2) {
- pi_1 = pi_2 = polling_island_lock(pi_1);
- break;
- }
-
- if (pi_1 < pi_2) {
- gpr_mu_lock(&pi_1->mu);
- gpr_mu_lock(&pi_2->mu);
- } else {
- gpr_mu_lock(&pi_2->mu);
- gpr_mu_lock(&pi_1->mu);
- }
-
- next_1 = (polling_island *)gpr_atm_acq_load(&pi_1->merged_to);
- next_2 = (polling_island *)gpr_atm_acq_load(&pi_2->merged_to);
- if (next_1 == NULL && next_2 == NULL) {
- break;
- }
-
- gpr_mu_unlock(&pi_1->mu);
- gpr_mu_unlock(&pi_2->mu);
- }
-
- *p = pi_1;
- *q = pi_2;
-}
-
-static void polling_island_unlock_pair(polling_island *p, polling_island *q) {
- if (p == q) {
- gpr_mu_unlock(&p->mu);
- } else {
- gpr_mu_unlock(&p->mu);
- gpr_mu_unlock(&q->mu);
- }
-}
-
-static polling_island *polling_island_merge(polling_island *p,
- polling_island *q,
- grpc_error **error) {
- /* Get locks on both the polling islands */
- polling_island_lock_pair(&p, &q);
-
- if (p != q) {
- /* Make sure that p points to the polling island with fewer fds than q */
- if (p->fd_cnt > q->fd_cnt) {
- GPR_SWAP(polling_island *, p, q);
- }
-
- /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
- Note that the refcounts on the fds being moved will not change here.
- This is why the last param in the following two functions is 'false') */
- polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
- polling_island_remove_all_fds_locked(p, false, error);
-
- /* Wakeup all the pollers (if any) on p so that they pickup this change */
- polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
-
- /* Add the 'merged_to' link from p --> q */
- gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
- PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
- }
- /* else if p == q, nothing needs to be done */
-
- polling_island_unlock_pair(p, q);
-
- /* Return the merged polling island (Note that no merge would have happened
- if p == q which is ok) */
- return q;
-}
-
-static grpc_error *polling_island_global_init() {
- grpc_error *error = GRPC_ERROR_NONE;
-
- error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
- if (error == GRPC_ERROR_NONE) {
- error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
- }
-
- return error;
-}
-
-static void polling_island_global_shutdown() {
- grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
-}
-
-/*******************************************************************************
- * Fd Definitions
- */
-
-/* We need to keep a freelist not because of any concerns of malloc performance
- * but instead so that implementations with multiple threads in (for example)
- * epoll_wait deal with the race between pollset removal and incoming poll
- * notifications.
- *
- * The problem is that the poller ultimately holds a reference to this
- * object, so it is very difficult to know when is safe to free it, at least
- * without some expensive synchronization.
- *
- * If we keep the object freelisted, in the worst case losing this race just
- * becomes a spurious read notification on a reused fd.
- */
-
-/* The alarm system needs to be able to wakeup 'some poller' sometimes
- * (specifically when a new alarm needs to be triggered earlier than the next
- * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
- * case occurs. */
-
-static grpc_fd *fd_freelist = NULL;
-static gpr_mu fd_freelist_mu;
-
-#ifndef NDEBUG
-#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
-#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
-static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
- int line) {
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
- gpr_log(GPR_DEBUG,
- "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
- fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
- gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
- }
-#else
-#define REF_BY(fd, n, reason) ref_by(fd, n)
-#define UNREF_BY(fd, n, reason) unref_by(fd, n)
-static void ref_by(grpc_fd *fd, int n) {
-#endif
- GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
-}
-
-#ifndef NDEBUG
-static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
- int line) {
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
- gpr_log(GPR_DEBUG,
- "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
- fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
- gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
- }
-#else
-static void unref_by(grpc_fd *fd, int n) {
-#endif
- gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
- if (old == n) {
- /* Add the fd to the freelist */
- gpr_mu_lock(&fd_freelist_mu);
- fd->freelist_next = fd_freelist;
- fd_freelist = fd;
- grpc_iomgr_unregister_object(&fd->iomgr_object);
-
- grpc_lfev_destroy(&fd->read_closure);
- grpc_lfev_destroy(&fd->write_closure);
-
- gpr_mu_unlock(&fd_freelist_mu);
- } else {
- GPR_ASSERT(old > n);
- }
-}
-
-/* Increment refcount by two to avoid changing the orphan bit */
-#ifndef NDEBUG
-static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
- int line) {
- ref_by(fd, 2, reason, file, line);
-}
-
-static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
- int line) {
- unref_by(fd, 2, reason, file, line);
-}
-#else
-static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
-static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
-#endif
-
-static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
-
-static void fd_global_shutdown(void) {
- gpr_mu_lock(&fd_freelist_mu);
- gpr_mu_unlock(&fd_freelist_mu);
- while (fd_freelist != NULL) {
- grpc_fd *fd = fd_freelist;
- fd_freelist = fd_freelist->freelist_next;
- gpr_mu_destroy(&fd->po.mu);
- gpr_free(fd);
- }
- gpr_mu_destroy(&fd_freelist_mu);
-}
-
-static grpc_fd *fd_create(int fd, const char *name) {
- grpc_fd *new_fd = NULL;
-
- gpr_mu_lock(&fd_freelist_mu);
- if (fd_freelist != NULL) {
- new_fd = fd_freelist;
- fd_freelist = fd_freelist->freelist_next;
- }
- gpr_mu_unlock(&fd_freelist_mu);
-
- if (new_fd == NULL) {
- new_fd = gpr_malloc(sizeof(grpc_fd));
- gpr_mu_init(&new_fd->po.mu);
- }
-
- /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
- * is a newly created fd (or an fd we got from the freelist), no one else
- * would be holding a lock to it anyway. */
- gpr_mu_lock(&new_fd->po.mu);
- new_fd->po.pi = NULL;
-#ifndef NDEBUG
- new_fd->po.obj_type = POLL_OBJ_FD;
-#endif
-
- gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
- new_fd->fd = fd;
- new_fd->orphaned = false;
- grpc_lfev_init(&new_fd->read_closure);
- grpc_lfev_init(&new_fd->write_closure);
- gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL);
-
- new_fd->freelist_next = NULL;
- new_fd->on_done_closure = NULL;
-
- gpr_mu_unlock(&new_fd->po.mu);
-
- char *fd_name;
- gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
- grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
-#ifndef NDEBUG
- if (GRPC_TRACER_ON(grpc_trace_fd_refcount)) {
- gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
- }
-#endif
- gpr_free(fd_name);
- return new_fd;
-}
-
-static int fd_wrapped_fd(grpc_fd *fd) {
- int ret_fd = -1;
- gpr_mu_lock(&fd->po.mu);
- if (!fd->orphaned) {
- ret_fd = fd->fd;
- }
- gpr_mu_unlock(&fd->po.mu);
-
- return ret_fd;
-}
-
-static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure *on_done, int *release_fd,
- bool already_closed, const char *reason) {
- grpc_error *error = GRPC_ERROR_NONE;
- polling_island *unref_pi = NULL;
-
- gpr_mu_lock(&fd->po.mu);
- fd->on_done_closure = on_done;
-
- /* Remove the active status but keep referenced. We want this grpc_fd struct
- to be alive (and not added to freelist) until the end of this function */
- REF_BY(fd, 1, reason);
-
- /* Remove the fd from the polling island:
- - Get a lock on the latest polling island (i.e the last island in the
- linked list pointed by fd->po.pi). This is the island that
- would actually contain the fd
- - Remove the fd from the latest polling island
- - Unlock the latest polling island
- - Set fd->po.pi to NULL (but remove the ref on the polling island
- before doing this.) */
- if (fd->po.pi != NULL) {
- polling_island *pi_latest = polling_island_lock(fd->po.pi);
- polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error);
- gpr_mu_unlock(&pi_latest->mu);
-
- unref_pi = fd->po.pi;
- fd->po.pi = NULL;
- }
-
- /* If release_fd is not NULL, we should be relinquishing control of the file
- descriptor fd->fd (but we still own the grpc_fd structure). */
- if (release_fd != NULL) {
- *release_fd = fd->fd;
- } else {
- close(fd->fd);
- }
-
- fd->orphaned = true;
-
- GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
-
- gpr_mu_unlock(&fd->po.mu);
- UNREF_BY(fd, 2, reason); /* Drop the reference */
- if (unref_pi != NULL) {
- /* Unref stale polling island here, outside the fd lock above.
- The polling island owns a workqueue which owns an fd, and unreffing
- inside the lock can cause an eventual lock loop that makes TSAN very
- unhappy. */
- PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
- }
- GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
- GRPC_ERROR_UNREF(error);
-}
-
-static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
- grpc_fd *fd) {
- gpr_atm notifier = gpr_atm_acq_load(&fd->read_notifier_pollset);
- return (grpc_pollset *)notifier;
-}
-
-static bool fd_is_shutdown(grpc_fd *fd) {
- return grpc_lfev_is_shutdown(&fd->read_closure);
-}
-
-/* Might be called multiple times */
-static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
- if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
- GRPC_ERROR_REF(why))) {
- shutdown(fd->fd, SHUT_RDWR);
- grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
- }
- GRPC_ERROR_UNREF(why);
-}
-
-static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
-}
-
-static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
-}
-
-/*******************************************************************************
- * Pollset Definitions
- */
-GPR_TLS_DECL(g_current_thread_pollset);
-GPR_TLS_DECL(g_current_thread_worker);
-static __thread bool g_initialized_sigmask;
-static __thread sigset_t g_orig_sigmask;
-static __thread sigset_t g_wakeup_sig_set;
-
-static void sig_handler(int sig_num) {
-#ifdef GRPC_EPOLL_DEBUG
- gpr_log(GPR_INFO, "Received signal %d", sig_num);
-#endif
-}
-
-static void pollset_worker_init(grpc_pollset_worker *worker) {
- worker->pt_id = pthread_self();
- worker->next = worker->prev = NULL;
- gpr_atm_no_barrier_store(&worker->is_kicked, (gpr_atm)0);
- gpr_atm_no_barrier_store(&worker->is_polling_turn, (gpr_atm)0);
- worker_node_init(&worker->pi_list_link);
-}
-
-static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
-
-/* Global state management */
-static grpc_error *pollset_global_init(void) {
- gpr_tls_init(&g_current_thread_pollset);
- gpr_tls_init(&g_current_thread_worker);
- poller_kick_init();
- return GRPC_ERROR_NONE;
-}
-
-static void pollset_global_shutdown(void) {
- gpr_tls_destroy(&g_current_thread_pollset);
- gpr_tls_destroy(&g_current_thread_worker);
-}
-
-static grpc_error *worker_kick(grpc_pollset_worker *worker,
- gpr_atm *is_kicked) {
- grpc_error *err = GRPC_ERROR_NONE;
-
- /* Kick the worker only if it was not already kicked */
- if (gpr_atm_no_barrier_cas(is_kicked, (gpr_atm)0, (gpr_atm)1)) {
- GRPC_POLLING_TRACE(
- "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
- (void *)worker, (long int)worker->pt_id);
- int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
- if (err_num != 0) {
- err = GRPC_OS_ERROR(err_num, "pthread_kill");
- }
- }
- return err;
-}
-
-static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
- return worker_kick(worker, &worker->is_kicked);
-}
-
-static grpc_error *poller_kick(grpc_pollset_worker *worker) {
- return worker_kick(worker, &worker->is_polling_turn);
-}
-
-/* Return 1 if the pollset has active threads in pollset_work (pollset must
- * be locked) */
-static int pollset_has_workers(grpc_pollset *p) {
- return p->root_worker.next != &p->root_worker;
-}
-
-static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
- worker->prev->next = worker->next;
- worker->next->prev = worker->prev;
-}
-
-static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
- if (pollset_has_workers(p)) {
- grpc_pollset_worker *w = p->root_worker.next;
- remove_worker(p, w);
- return w;
- } else {
- return NULL;
- }
-}
-
-static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
- worker->next = &p->root_worker;
- worker->prev = worker->next->prev;
- worker->prev->next = worker->next->prev = worker;
-}
-
-static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
- worker->prev = &p->root_worker;
- worker->next = worker->prev->next;
- worker->prev->next = worker->next->prev = worker;
-}
-
-/* p->mu must be held before calling this function */
-static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *p,
- grpc_pollset_worker *specific_worker) {
- GPR_TIMER_BEGIN("pollset_kick", 0);
- grpc_error *error = GRPC_ERROR_NONE;
- const char *err_desc = "Kick Failure";
- grpc_pollset_worker *worker = specific_worker;
- GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
- if (worker != NULL) {
- if (worker == GRPC_POLLSET_KICK_BROADCAST) {
- if (pollset_has_workers(p)) {
- GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
- for (worker = p->root_worker.next; worker != &p->root_worker;
- worker = worker->next) {
- if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
- append_error(&error, pollset_worker_kick(worker), err_desc);
- }
- }
- GPR_TIMER_END("pollset_kick.broadcast", 0);
- } else {
- p->kicked_without_pollers = true;
- }
- } else {
- GPR_TIMER_MARK("kicked_specifically", 0);
- if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
- append_error(&error, pollset_worker_kick(worker), err_desc);
- }
- }
- } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
- /* Since worker == NULL, it means that we can kick "any" worker on this
- pollset 'p'. If 'p' happens to be the same pollset this thread is
- currently polling (i.e in pollset_work() function), then there is no need
- to kick any other worker since the current thread can just absorb the
- kick. This is the reason why we enter this case only when
- g_current_thread_pollset is != p */
-
- GPR_TIMER_MARK("kick_anonymous", 0);
- worker = pop_front_worker(p);
- if (worker != NULL) {
- GPR_TIMER_MARK("finally_kick", 0);
- push_back_worker(p, worker);
- append_error(&error, pollset_worker_kick(worker), err_desc);
- } else {
- GPR_TIMER_MARK("kicked_no_pollers", 0);
- p->kicked_without_pollers = true;
- }
- }
-
- GPR_TIMER_END("pollset_kick", 0);
- GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
- return error;
-}
-
-static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- gpr_mu_init(&pollset->po.mu);
- *mu = &pollset->po.mu;
- pollset->po.pi = NULL;
-#ifndef NDEBUG
- pollset->po.obj_type = POLL_OBJ_POLLSET;
-#endif
-
- pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
- pollset->kicked_without_pollers = false;
-
- pollset->shutting_down = false;
- pollset->finish_shutdown_called = false;
- pollset->shutdown_done = NULL;
-}
-
-/* Convert millis to timespec (clock-type is assumed to be GPR_TIMESPAN) */
-static struct timespec millis_to_timespec(int millis) {
- struct timespec linux_ts;
- gpr_timespec gpr_ts;
-
- if (millis == -1) {
- gpr_ts = gpr_inf_future(GPR_TIMESPAN);
- } else {
- gpr_ts = gpr_time_from_millis(millis, GPR_TIMESPAN);
- }
-
- linux_ts.tv_sec = (time_t)gpr_ts.tv_sec;
- linux_ts.tv_nsec = gpr_ts.tv_nsec;
- return linux_ts;
-}
-
-/* Convert a timespec to milliseconds:
- - Very small or negative poll times are clamped to zero to do a non-blocking
- poll (which becomes spin polling)
- - Other small values are rounded up to one millisecond
- - Longer than a millisecond polls are rounded up to the next nearest
- millisecond to avoid spinning
- - Infinite timeouts are converted to -1 */
-static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
- gpr_timespec now) {
- gpr_timespec timeout;
- static const int64_t max_spin_polling_us = 10;
- if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
- return -1;
- }
-
- if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
- max_spin_polling_us,
- GPR_TIMESPAN))) <= 0) {
- return 0;
- }
- timeout = gpr_time_sub(deadline, now);
- int millis = gpr_time_to_millis(gpr_time_add(
- timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
- return millis >= 1 ? millis : 1;
-}
-
-static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
- /* Use release store to match with acquire load in fd_get_read_notifier */
- gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
-}
-
-static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
-}
-
-static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
- grpc_pollset *ps, char *reason) {
- if (ps->po.pi != NULL) {
- PI_UNREF(exec_ctx, ps->po.pi, reason);
- }
- ps->po.pi = NULL;
-}
-
-static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset) {
- /* The pollset cannot have any workers if we are at this stage */
- GPR_ASSERT(!pollset_has_workers(pollset));
-
- pollset->finish_shutdown_called = true;
-
- /* Release the ref and set pollset->po.pi to NULL */
- pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
-}
-
-/* pollset->po.mu lock must be held by the caller before calling this */
-static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_closure *closure) {
- GPR_TIMER_BEGIN("pollset_shutdown", 0);
- GPR_ASSERT(!pollset->shutting_down);
- pollset->shutting_down = true;
- pollset->shutdown_done = closure;
- pollset_kick(exec_ctx, pollset, GRPC_POLLSET_KICK_BROADCAST);
-
- /* If the pollset has any workers, we cannot call finish_shutdown_locked()
- because it would release the underlying polling island. In such a case, we
- let the last worker call finish_shutdown_locked() from pollset_work() */
- if (!pollset_has_workers(pollset)) {
- GPR_ASSERT(!pollset->finish_shutdown_called);
- GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
- finish_shutdown_locked(exec_ctx, pollset);
- }
- GPR_TIMER_END("pollset_shutdown", 0);
-}
-
-/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
- * than destroying the mutexes, there is nothing special that needs to be done
- * here */
-static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
- GPR_ASSERT(!pollset_has_workers(pollset));
- gpr_mu_destroy(&pollset->po.mu);
-}
-
-/* NOTE: This function may modify 'now' */
-static bool acquire_polling_lease(grpc_exec_ctx *exec_ctx,
- grpc_pollset_worker *worker,
- polling_island *pi, gpr_timespec deadline,
- gpr_timespec *now) {
- bool is_lease_acquired = false;
-
- gpr_mu_lock(&pi->worker_list_mu); // LOCK
- long num_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
-
- if (num_pollers >= g_max_pollers_per_pi) {
- push_back_worker_node(&pi->worker_list_head, &worker->pi_list_link);
- gpr_mu_unlock(&pi->worker_list_mu); // UNLOCK
-
- bool is_timeout = false;
- int ret;
- int timeout_ms = poll_deadline_to_millis_timeout(deadline, *now);
- if (timeout_ms == -1) {
- ret = sigwaitinfo(&g_wakeup_sig_set, NULL);
- } else {
- struct timespec sigwait_timeout = millis_to_timespec(timeout_ms);
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- GRPC_STATS_INC_SYSCALL_WAIT(exec_ctx);
- ret = sigtimedwait(&g_wakeup_sig_set, NULL, &sigwait_timeout);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
- }
-
- if (ret == -1) {
- if (errno == EAGAIN) {
- is_timeout = true;
- } else {
- /* NOTE: This should not happen. If we see these log messages, it means
- we are most likely doing something incorrect in the setup * needed
- for sigwaitinfo/sigtimedwait */
- gpr_log(GPR_ERROR,
- "sigtimedwait failed with retcode: %d (timeout_ms: %d)", errno,
- timeout_ms);
- }
- }
-
- /* Did the worker come out of sigtimedwait due to a thread that just
- exited epoll and kicking it (in release_polling_lease function). */
- bool is_polling_turn = gpr_atm_acq_load(&worker->is_polling_turn);
-
- /* Did the worker come out of sigtimedwait due to a thread alerting it that
- some completion event was (likely) available in the completion queue */
- bool is_kicked = gpr_atm_no_barrier_load(&worker->is_kicked);
-
- if (is_kicked || is_timeout) {
- *now = deadline; /* Essentially make the epoll timeout = 0 */
- } else if (is_polling_turn) {
- *now = gpr_now(GPR_CLOCK_MONOTONIC); /* Reduce the epoll timeout */
- }
-
- gpr_mu_lock(&pi->worker_list_mu); // LOCK
- /* The node might have already been removed from the list by the poller
- that kicked this. However it is safe to call 'remove_worker_node' on
- an already detached node */
- remove_worker_node(&worker->pi_list_link);
- /* It is important to read the num_pollers again under the lock so that we
- * have the latest num_pollers value that doesn't change while we are doing
- * the "(num_pollers < g_max_pollers_per_pi)" a a few lines below */
- num_pollers = gpr_atm_no_barrier_load(&pi->poller_count);
- }
-
- if (num_pollers < g_max_pollers_per_pi) {
- gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
- is_lease_acquired = true;
- }
-
- gpr_mu_unlock(&pi->worker_list_mu); // UNLOCK
- return is_lease_acquired;
-}
-
-static void release_polling_lease(polling_island *pi, grpc_error **error) {
- gpr_mu_lock(&pi->worker_list_mu);
-
- gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
- worker_node *node = pop_front_worker_node(&pi->worker_list_head);
- if (node != NULL) {
- grpc_pollset_worker *next_worker = WORKER_FROM_WORKER_LIST_NODE(node);
- append_error(error, poller_kick(next_worker), "poller kick error");
- }
-
- gpr_mu_unlock(&pi->worker_list_mu);
-}
-
-#define GRPC_EPOLL_MAX_EVENTS 100
-static void pollset_do_epoll_pwait(grpc_exec_ctx *exec_ctx, int epoll_fd,
- grpc_pollset *pollset, polling_island *pi,
- grpc_pollset_worker *worker,
- gpr_timespec now, gpr_timespec deadline,
- sigset_t *sig_mask, grpc_error **error) {
- /* Only g_max_pollers_per_pi threads can be doing polling in parallel.
- If we cannot get a lease, we cannot continue to do epoll_pwait() */
- if (!acquire_polling_lease(exec_ctx, worker, pi, deadline, &now)) {
- return;
- }
-
- struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
- int ep_rv;
- char *err_msg;
- const char *err_desc = "pollset_work_and_unlock";
-
- /* timeout_ms is the time between 'now' and 'deadline' */
- int timeout_ms = poll_deadline_to_millis_timeout(deadline, now);
-
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
- ep_rv =
- epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
-
- /* Give back the lease right away so that some other thread can enter */
- release_polling_lease(pi, error);
-
- if (ep_rv < 0) {
- if (errno != EINTR) {
- gpr_asprintf(&err_msg,
- "epoll_wait() epoll fd: %d failed with error: %d (%s)",
- epoll_fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- } else {
- /* We were interrupted. Save an interation by doing a zero timeout
- epoll_wait to see if there are any other events of interest */
- GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick",
- (void *)pollset, (void *)worker);
- ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
- }
- }
-
-#ifdef GRPC_TSAN
- /* See the definition of g_poll_sync for more details */
- gpr_atm_acq_load(&g_epoll_sync);
-#endif /* defined(GRPC_TSAN) */
-
- for (int i = 0; i < ep_rv; ++i) {
- void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &polling_island_wakeup_fd) {
- GRPC_POLLING_TRACE(
- "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
- "%d) got merged",
- (void *)pollset, (void *)worker, epoll_fd);
- /* This means that our polling island is merged with a different
- island. We do not have to do anything here since the subsequent call
- to the function pollset_work_and_unlock() will pick up the correct
- epoll_fd */
- } else {
- grpc_fd *fd = data_ptr;
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd, pollset);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
- }
- }
-}
-
-/* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
-static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset,
- grpc_pollset_worker *worker,
- gpr_timespec now, gpr_timespec deadline,
- sigset_t *sig_mask, grpc_error **error) {
- int epoll_fd = -1;
- polling_island *pi = NULL;
- GPR_TIMER_BEGIN("pollset_work_and_unlock", 0);
-
- /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
- latest polling island pointed by pollset->po.pi
-
- Since epoll_fd is immutable, it is safe to read it without a lock on the
- polling island. There is however a possibility that the polling island from
- which we got the epoll_fd, got merged with another island in the meantime.
- This is okay because in such a case, we will wakeup right-away from
- epoll_pwait() (because any merge will poison the old polling island's epoll
- set 'polling_island_wakeup_fd') and then pick up the latest polling_island
- the next time this function - pollset_work_and_unlock()) is called */
-
- if (pollset->po.pi == NULL) {
- pollset->po.pi = polling_island_create(exec_ctx, NULL, error);
- if (pollset->po.pi == NULL) {
- GPR_TIMER_END("pollset_work_and_unlock", 0);
- return; /* Fatal error. Cannot continue */
- }
-
- PI_ADD_REF(pollset->po.pi, "ps");
- GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
- (void *)pollset, (void *)pollset->po.pi);
- }
-
- pi = polling_island_maybe_get_latest(pollset->po.pi);
- epoll_fd = pi->epoll_fd;
-
- /* Update the pollset->po.pi since the island being pointed by
- pollset->po.pi maybe older than the one pointed by pi) */
- if (pollset->po.pi != pi) {
- /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
- polling island to be deleted */
- PI_ADD_REF(pi, "ps");
- PI_UNREF(exec_ctx, pollset->po.pi, "ps");
- pollset->po.pi = pi;
- }
-
- /* Add an extra ref so that the island does not get destroyed (which means
- the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
- epoll_fd */
- PI_ADD_REF(pi, "ps_work");
- gpr_mu_unlock(&pollset->po.mu);
-
- g_current_thread_polling_island = pi;
- pollset_do_epoll_pwait(exec_ctx, epoll_fd, pollset, pi, worker, now, deadline,
- sig_mask, error);
- g_current_thread_polling_island = NULL;
-
- GPR_ASSERT(pi != NULL);
-
- /* Before leaving, release the extra ref we added to the polling island. It
- is important to use "pi" here (i.e our old copy of pollset->po.pi
- that we got before releasing the polling island lock). This is because
- pollset->po.pi pointer might get udpated in other parts of the
- code when there is an island merge while we are doing epoll_wait() above */
- PI_UNREF(exec_ctx, pi, "ps_work");
-
- GPR_TIMER_END("pollset_work_and_unlock", 0);
-}
-
-/* pollset->po.mu lock must be held by the caller before calling this.
- The function pollset_work() may temporarily release the lock (pollset->po.mu)
- during the course of its execution but it will always re-acquire the lock and
- ensure that it is held by the time the function returns */
-static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker_hdl,
- gpr_timespec now, gpr_timespec deadline) {
- GPR_TIMER_BEGIN("pollset_work", 0);
- grpc_error *error = GRPC_ERROR_NONE;
-
- grpc_pollset_worker worker;
- pollset_worker_init(&worker);
-
- if (worker_hdl) *worker_hdl = &worker;
-
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
- gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
-
- if (pollset->kicked_without_pollers) {
- /* If the pollset was kicked without pollers, pretend that the current
- worker got the kick and skip polling. A kick indicates that there is some
- work that needs attention like an event on the completion queue or an
- alarm */
- GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
- pollset->kicked_without_pollers = 0;
- } else if (!pollset->shutting_down) {
- /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
- (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
- worker that there is some pending work that needs immediate attention
- (like an event on the completion queue, or a polling island merge that
- results in a new epoll-fd to wait on) and that the worker should not
- spend time waiting in epoll_pwait().
-
- A worker can be kicked anytime from the point it is added to the pollset
- via push_front_worker() (or push_back_worker()) to the point it is
- removed via remove_worker().
- If the worker is kicked before/during it calls epoll_pwait(), it should
- immediately exit from epoll_wait(). If the worker is kicked after it
- returns from epoll_wait(), then nothing really needs to be done.
-
- To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
- times *except* when it is in epoll_pwait(). This way, the worker never
- misses acting on a kick */
-
- if (!g_initialized_sigmask) {
- sigemptyset(&g_wakeup_sig_set);
- sigaddset(&g_wakeup_sig_set, grpc_wakeup_signal);
- pthread_sigmask(SIG_BLOCK, &g_wakeup_sig_set, &g_orig_sigmask);
- sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
- g_initialized_sigmask = true;
- /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
- This is the mask used at all times *except during
- epoll_wait()*"
- g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
- this is the mask to use *during epoll_wait()*
-
- The new_mask is set on the worker before it is added to the pollset
- (i.e before it can be kicked) */
- }
-
- push_front_worker(pollset, &worker); /* Add worker to pollset */
-
- pollset_work_and_unlock(exec_ctx, pollset, &worker, now, deadline,
- &g_orig_sigmask, &error);
- grpc_exec_ctx_flush(exec_ctx);
-
- gpr_mu_lock(&pollset->po.mu);
-
- /* Note: There is no need to reset worker.is_kicked to 0 since we are no
- longer going to use this worker */
- remove_worker(pollset, &worker);
- }
-
- /* If we are the last worker on the pollset (i.e pollset_has_workers() is
- false at this point) and the pollset is shutting down, we may have to
- finish the shutdown process by calling finish_shutdown_locked().
- See pollset_shutdown() for more details.
-
- Note: Continuing to access pollset here is safe; it is the caller's
- responsibility to not destroy a pollset when it has outstanding calls to
- pollset_work() */
- if (pollset->shutting_down && !pollset_has_workers(pollset) &&
- !pollset->finish_shutdown_called) {
- GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
- finish_shutdown_locked(exec_ctx, pollset);
-
- gpr_mu_unlock(&pollset->po.mu);
- grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->po.mu);
- }
-
- if (worker_hdl) *worker_hdl = NULL;
-
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
- gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
-
- GPR_TIMER_END("pollset_work", 0);
-
- GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
- return error;
-}
-
-static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag,
- poll_obj_type bag_type, poll_obj *item,
- poll_obj_type item_type) {
- GPR_TIMER_BEGIN("add_poll_object", 0);
-
-#ifndef NDEBUG
- GPR_ASSERT(item->obj_type == item_type);
- GPR_ASSERT(bag->obj_type == bag_type);
-#endif
-
- grpc_error *error = GRPC_ERROR_NONE;
- polling_island *pi_new = NULL;
-
- gpr_mu_lock(&bag->mu);
- gpr_mu_lock(&item->mu);
-
-retry:
- /*
- * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
- * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
- * a refcount of 2) and point item->pi and bag->pi to the new island
- * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
- * the other's non-NULL pi
- * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
- * polling islands and update item->pi and bag->pi to point to the new
- * island
- */
-
- /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
- * orphaned */
- if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
- gpr_mu_unlock(&item->mu);
- gpr_mu_unlock(&bag->mu);
- return;
- }
-
- if (item->pi == bag->pi) {
- pi_new = item->pi;
- if (pi_new == NULL) {
- /* GPR_ASSERT(item->pi == bag->pi == NULL) */
-
- /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
- * we need to do some extra work to make TSAN happy */
- if (item_type == POLL_OBJ_FD) {
- /* Unlock before creating a new polling island: the polling island will
- create a workqueue which creates a file descriptor, and holding an fd
- lock here can eventually cause a loop to appear to TSAN (making it
- unhappy). We don't think it's a real loop (there's an epoch point
- where that loop possibility disappears), but the advantages of
- keeping TSAN happy outweigh any performance advantage we might have
- by keeping the lock held. */
- gpr_mu_unlock(&item->mu);
- pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error);
- gpr_mu_lock(&item->mu);
-
- /* Need to reverify any assumptions made between the initial lock and
- getting to this branch: if they've changed, we need to throw away our
- work and figure things out again. */
- if (item->pi != NULL) {
- GRPC_POLLING_TRACE(
- "add_poll_object: Raced creating new polling island. pi_new: %p "
- "(fd: %d, %s: %p)",
- (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
- (void *)bag);
- /* No need to lock 'pi_new' here since this is a new polling island
- and no one has a reference to it yet */
- polling_island_remove_all_fds_locked(pi_new, true, &error);
-
- /* Ref and unref so that the polling island gets deleted during unref
- */
- PI_ADD_REF(pi_new, "dance_of_destruction");
- PI_UNREF(exec_ctx, pi_new, "dance_of_destruction");
- goto retry;
- }
- } else {
- pi_new = polling_island_create(exec_ctx, NULL, &error);
- }
-
- GRPC_POLLING_TRACE(
- "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
- "%s: %p)",
- (void *)pi_new, poll_obj_string(item_type), (void *)item,
- poll_obj_string(bag_type), (void *)bag);
- } else {
- GRPC_POLLING_TRACE(
- "add_poll_object: Same polling island. pi: %p (%s, %s)",
- (void *)pi_new, poll_obj_string(item_type),
- poll_obj_string(bag_type));
- }
- } else if (item->pi == NULL) {
- /* GPR_ASSERT(bag->pi != NULL) */
- /* Make pi_new point to latest pi*/
- pi_new = polling_island_lock(bag->pi);
-
- if (item_type == POLL_OBJ_FD) {
- grpc_fd *fd = FD_FROM_PO(item);
- polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
- }
-
- gpr_mu_unlock(&pi_new->mu);
- GRPC_POLLING_TRACE(
- "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
- "bag(%s): %p)",
- (void *)pi_new, poll_obj_string(item_type), (void *)item,
- poll_obj_string(bag_type), (void *)bag);
- } else if (bag->pi == NULL) {
- /* GPR_ASSERT(item->pi != NULL) */
- /* Make pi_new to point to latest pi */
- pi_new = polling_island_lock(item->pi);
- gpr_mu_unlock(&pi_new->mu);
- GRPC_POLLING_TRACE(
- "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
- "bag(%s): %p)",
- (void *)pi_new, poll_obj_string(item_type), (void *)item,
- poll_obj_string(bag_type), (void *)bag);
- } else {
- pi_new = polling_island_merge(item->pi, bag->pi, &error);
- GRPC_POLLING_TRACE(
- "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
- "bag(%s): %p)",
- (void *)pi_new, poll_obj_string(item_type), (void *)item,
- poll_obj_string(bag_type), (void *)bag);
- }
-
- /* At this point, pi_new is the polling island that both item->pi and bag->pi
- MUST be pointing to */
-
- if (item->pi != pi_new) {
- PI_ADD_REF(pi_new, poll_obj_string(item_type));
- if (item->pi != NULL) {
- PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type));
- }
- item->pi = pi_new;
- }
-
- if (bag->pi != pi_new) {
- PI_ADD_REF(pi_new, poll_obj_string(bag_type));
- if (bag->pi != NULL) {
- PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type));
- }
- bag->pi = pi_new;
- }
-
- gpr_mu_unlock(&item->mu);
- gpr_mu_unlock(&bag->mu);
-
- GRPC_LOG_IF_ERROR("add_poll_object", error);
- GPR_TIMER_END("add_poll_object", 0);
-}
-
-static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_fd *fd) {
- add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po,
- POLL_OBJ_FD);
-}
-
-/*******************************************************************************
- * Pollset-set Definitions
- */
-
-static grpc_pollset_set *pollset_set_create(void) {
- grpc_pollset_set *pss = gpr_malloc(sizeof(*pss));
- gpr_mu_init(&pss->po.mu);
- pss->po.pi = NULL;
-#ifndef NDEBUG
- pss->po.obj_type = POLL_OBJ_POLLSET_SET;
-#endif
- return pss;
-}
-
-static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss) {
- gpr_mu_destroy(&pss->po.mu);
-
- if (pss->po.pi != NULL) {
- PI_UNREF(exec_ctx, pss->po.pi, "pss_destroy");
- }
-
- gpr_free(pss);
-}
-
-static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
- grpc_fd *fd) {
- add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po,
- POLL_OBJ_FD);
-}
-
-static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
- grpc_fd *fd) {
- /* Nothing to do */
-}
-
-static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {
- add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po,
- POLL_OBJ_POLLSET);
-}
-
-static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {
- /* Nothing to do */
-}
-
-static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {
- add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po,
- POLL_OBJ_POLLSET_SET);
-}
-
-static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {
- /* Nothing to do */
-}
-
-/*******************************************************************************
- * Event engine binding
- */
-
-static void shutdown_engine(void) {
- fd_global_shutdown();
- pollset_global_shutdown();
- polling_island_global_shutdown();
-}
-
-static const grpc_event_engine_vtable vtable = {
- .pollset_size = sizeof(grpc_pollset),
-
- .fd_create = fd_create,
- .fd_wrapped_fd = fd_wrapped_fd,
- .fd_orphan = fd_orphan,
- .fd_shutdown = fd_shutdown,
- .fd_is_shutdown = fd_is_shutdown,
- .fd_notify_on_read = fd_notify_on_read,
- .fd_notify_on_write = fd_notify_on_write,
- .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
-
- .pollset_init = pollset_init,
- .pollset_shutdown = pollset_shutdown,
- .pollset_destroy = pollset_destroy,
- .pollset_work = pollset_work,
- .pollset_kick = pollset_kick,
- .pollset_add_fd = pollset_add_fd,
-
- .pollset_set_create = pollset_set_create,
- .pollset_set_destroy = pollset_set_destroy,
- .pollset_set_add_pollset = pollset_set_add_pollset,
- .pollset_set_del_pollset = pollset_set_del_pollset,
- .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
- .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
- .pollset_set_add_fd = pollset_set_add_fd,
- .pollset_set_del_fd = pollset_set_del_fd,
-
- .shutdown_engine = shutdown_engine,
-};
-
-/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
- * Create a dummy epoll_fd to make sure epoll support is available */
-static bool is_epoll_available() {
- int fd = epoll_create1(EPOLL_CLOEXEC);
- if (fd < 0) {
- gpr_log(
- GPR_ERROR,
- "epoll_create1 failed with error: %d. Not using epoll polling engine",
- fd);
- return false;
- }
- close(fd);
- return true;
-}
-
-/* This is mainly for testing purposes. Checks to see if environment variable
- * GRPC_MAX_POLLERS_PER_PI is set and if so, assigns that value to
- * g_max_pollers_per_pi (any negative value is considered INT_MAX) */
-static void set_max_pollers_per_island() {
- char *s = gpr_getenv("GRPC_MAX_POLLERS_PER_PI");
- if (s) {
- g_max_pollers_per_pi = (int)strtol(s, NULL, 10);
- if (g_max_pollers_per_pi < 0) {
- g_max_pollers_per_pi = INT_MAX;
- }
- } else {
- g_max_pollers_per_pi = INT_MAX;
- }
-
- gpr_log(GPR_INFO, "Max number of pollers per polling island: %d",
- g_max_pollers_per_pi);
-}
-
-const grpc_event_engine_vtable *grpc_init_epoll_limited_pollers_linux(
- bool explicitly_requested) {
- if (!explicitly_requested) {
- return NULL;
- }
-
- /* If use of signals is disabled, we cannot use epoll engine*/
- if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
- return NULL;
- }
-
- if (!grpc_has_wakeup_fd()) {
- return NULL;
- }
-
- if (!is_epoll_available()) {
- return NULL;
- }
-
- if (!is_grpc_wakeup_signal_initialized) {
- grpc_use_signal(SIGRTMIN + 6);
- }
-
- set_max_pollers_per_island();
-
- fd_global_init();
-
- if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
- return NULL;
- }
-
- if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
- polling_island_global_init())) {
- return NULL;
- }
-
- return &vtable;
-}
-
-#else /* defined(GRPC_LINUX_EPOLL) */
-#if defined(GRPC_POSIX_SOCKET)
-#include "src/core/lib/iomgr/ev_posix.h"
-/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
- * NULL */
-const grpc_event_engine_vtable *grpc_init_epoll_limited_pollers_linux(
- bool explicitly_requested) {
- return NULL;
-}
-#endif /* defined(GRPC_POSIX_SOCKET) */
-#endif /* !defined(GRPC_LINUX_EPOLL) */
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.h b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.h
deleted file mode 100644
index 1d6af5f52c..0000000000
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_EV_EPOLL_LIMITED_POLLERS_LINUX_H
-#define GRPC_CORE_LIB_IOMGR_EV_EPOLL_LIMITED_POLLERS_LINUX_H
-
-#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/iomgr/port.h"
-
-const grpc_event_engine_vtable *grpc_init_epoll_limited_pollers_linux(
- bool explicitly_requested);
-
-#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL_LIMITED_POLLERS_LINUX_H */
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
deleted file mode 100644
index ace491550b..0000000000
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ /dev/null
@@ -1,1185 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "src/core/lib/iomgr/port.h"
-
-/* This polling engine is only relevant on linux kernels supporting epoll() */
-#ifdef GRPC_LINUX_EPOLL
-
-#include "src/core/lib/iomgr/ev_epoll_thread_pool_linux.h"
-
-#include <assert.h>
-#include <errno.h>
-#include <limits.h>
-#include <poll.h>
-#include <pthread.h>
-#include <string.h>
-#include <sys/epoll.h>
-#include <sys/socket.h>
-#include <unistd.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc/support/cpu.h>
-#include <grpc/support/log.h>
-#include <grpc/support/string_util.h>
-#include <grpc/support/thd.h>
-#include <grpc/support/tls.h>
-#include <grpc/support/useful.h>
-
-#include "src/core/lib/debug/stats.h"
-#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/iomgr/iomgr_internal.h"
-#include "src/core/lib/iomgr/lockfree_event.h"
-#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/profiling/timers.h"
-#include "src/core/lib/support/block_annotate.h"
-
-/* TODO: sreek - Move this to init.c and initialize this like other tracers. */
-#define GRPC_POLLING_TRACE(fmt, ...) \
- if (GRPC_TRACER_ON(grpc_polling_trace)) { \
- gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \
- }
-
-/* The alarm system needs to be able to wakeup 'some poller' sometimes
- * (specifically when a new alarm needs to be triggered earlier than the next
- * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
- * case occurs. */
-
-struct epoll_set;
-
-#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1)
-
-/*******************************************************************************
- * Fd Declarations
- */
-struct grpc_fd {
- gpr_mu mu;
- struct epoll_set *eps;
-
- int fd;
-
- /* The fd is either closed or we relinquished control of it. In either cases,
- this indicates that the 'fd' on this structure is no longer valid */
- bool orphaned;
-
- gpr_atm read_closure;
- gpr_atm write_closure;
-
- struct grpc_fd *freelist_next;
- grpc_closure *on_done_closure;
-
- grpc_iomgr_object iomgr_object;
-};
-
-static void fd_global_init(void);
-static void fd_global_shutdown(void);
-
-/*******************************************************************************
- * epoll set Declarations
- */
-
-#ifndef NDEBUG
-
-#define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__)
-#define EPS_UNREF(exec_ctx, p, r) \
- eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
-
-#else
-
-#define EPS_ADD_REF(p, r) eps_add_ref((p))
-#define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p))
-
-#endif
-
-typedef struct epoll_set {
- /* Mutex poller should acquire to poll this. This enforces that only one
- * poller can be polling on epoll_set at any time */
- gpr_mu mu;
-
- /* Ref count. Use EPS_ADD_REF() and EPS_UNREF() macros to increment/decrement
- the refcount. Once the ref count becomes zero, this structure is destroyed
- which means we should ensure that there is never a scenario where a
- EPS_ADD_REF() is racing with a EPS_UNREF() that just made the ref_count
- zero. */
- gpr_atm ref_count;
-
- /* Number of threads currently polling on this epoll set*/
- gpr_atm poller_count;
-
- /* Is the epoll set shutdown */
- gpr_atm is_shutdown;
-
- /* The fd of the underlying epoll set */
- int epoll_fd;
-} epoll_set;
-
-/*******************************************************************************
- * Pollset Declarations
- */
-struct grpc_pollset_worker {
- gpr_cv kick_cv;
-
- struct grpc_pollset_worker *next;
- struct grpc_pollset_worker *prev;
-};
-
-struct grpc_pollset {
- gpr_mu mu;
- struct epoll_set *eps;
-
- grpc_pollset_worker root_worker;
- bool kicked_without_pollers;
-
- bool shutting_down; /* Is the pollset shutting down ? */
- bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
- grpc_closure *shutdown_done; /* Called after after shutdown is complete */
-};
-
-/*******************************************************************************
- * Pollset-set Declarations
- */
-struct grpc_pollset_set {
- char unused;
-};
-
-/*****************************************************************************
- * Dedicated polling threads and pollsets - Declarations
- */
-
-size_t g_num_eps = 1;
-struct epoll_set **g_epoll_sets = NULL;
-gpr_atm g_next_eps;
-size_t g_num_threads_per_eps = 1;
-gpr_thd_id *g_poller_threads = NULL;
-
-/* Used as read-notifier pollsets for fds. We won't be using read notifier
- * pollsets with this polling engine. So it does not matter what pollset we
- * return */
-grpc_pollset g_read_notifier;
-
-static void add_fd_to_eps(grpc_fd *fd);
-static bool init_epoll_sets();
-static void shutdown_epoll_sets();
-static void poller_thread_loop(void *arg);
-static void start_poller_threads();
-static void shutdown_poller_threads();
-
-/*******************************************************************************
- * Common helpers
- */
-
-static bool append_error(grpc_error **composite, grpc_error *error,
- const char *desc) {
- if (error == GRPC_ERROR_NONE) return true;
- if (*composite == GRPC_ERROR_NONE) {
- *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
- }
- *composite = grpc_error_add_child(*composite, error);
- return false;
-}
-
-/*******************************************************************************
- * epoll set Definitions
- */
-
-/* The wakeup fd that is used to wake up all threads in an epoll_set informing
- that the epoll set is shutdown. This wakeup fd initialized to be readable
- and MUST NOT be consumed i.e the threads that woke up MUST NOT call
- grpc_wakeup_fd_consume_wakeup() */
-static grpc_wakeup_fd epoll_set_wakeup_fd;
-
-/* The epoll set being polled right now.
- See comments in workqueue_maybe_wakeup for why this is tracked. */
-static __thread epoll_set *g_current_thread_epoll_set;
-
-/* Forward declaration */
-static void epoll_set_delete(epoll_set *eps);
-
-#ifdef GRPC_TSAN
-/* Currently TSAN may incorrectly flag data races between epoll_ctl and
- epoll_wait for any grpc_fd structs that are added to the epoll set via
- epoll_ctl and are returned (within a very short window) via epoll_wait().
-
- To work-around this race, we establish a happens-before relation between
- the code just-before epoll_ctl() and the code after epoll_wait() by using
- this atomic */
-gpr_atm g_epoll_sync;
-#endif /* defined(GRPC_TSAN) */
-
-static void eps_add_ref(epoll_set *eps);
-static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps);
-
-#ifndef NDEBUG
-static void eps_add_ref_dbg(epoll_set *eps, const char *reason,
- const char *file, int line) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_atm old_cnt = gpr_atm_acq_load(&eps->ref_count);
- gpr_log(GPR_DEBUG, "Add ref eps: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
- " (%s) - (%s, %d)",
- eps, old_cnt, old_cnt + 1, reason, file, line);
- }
- eps_add_ref(eps);
-}
-
-static void eps_unref_dbg(grpc_exec_ctx *exec_ctx, epoll_set *eps,
- const char *reason, const char *file, int line) {
- if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_atm old_cnt = gpr_atm_acq_load(&eps->ref_count);
- gpr_log(GPR_DEBUG, "Unref eps: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
- " (%s) - (%s, %d)",
- eps, old_cnt, (old_cnt - 1), reason, file, line);
- }
- eps_unref(exec_ctx, eps);
-}
-#endif
-
-static void eps_add_ref(epoll_set *eps) {
- gpr_atm_no_barrier_fetch_add(&eps->ref_count, 1);
-}
-
-static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps) {
- /* If ref count went to zero, delete the epoll set. This deletion is
- not done under a lock since once the ref count goes to zero, we are
- guaranteed that no one else holds a reference to the epoll set (and
- that there is no racing eps_add_ref() call either).*/
- if (1 == gpr_atm_full_fetch_add(&eps->ref_count, -1)) {
- epoll_set_delete(eps);
- }
-}
-
-static void epoll_set_add_fd_locked(epoll_set *eps, grpc_fd *fd,
- grpc_error **error) {
- int err;
- struct epoll_event ev;
- char *err_msg;
- const char *err_desc = "epoll_set_add_fd_locked";
-
-#ifdef GRPC_TSAN
- /* See the definition of g_epoll_sync for more context */
- gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
-#endif /* defined(GRPC_TSAN) */
-
- ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
- ev.data.ptr = fd;
- err = epoll_ctl(eps->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
- if (err < 0 && errno != EEXIST) {
- gpr_asprintf(
- &err_msg,
- "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
- eps->epoll_fd, fd->fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
-}
-
-static void epoll_set_add_wakeup_fd_locked(epoll_set *eps,
- grpc_wakeup_fd *wakeup_fd,
- grpc_error **error) {
- struct epoll_event ev;
- int err;
- char *err_msg;
- const char *err_desc = "epoll_set_add_wakeup_fd";
-
- ev.events = (uint32_t)(EPOLLIN | EPOLLET);
- ev.data.ptr = wakeup_fd;
- err = epoll_ctl(eps->epoll_fd, EPOLL_CTL_ADD,
- GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
- if (err < 0 && errno != EEXIST) {
- gpr_asprintf(&err_msg,
- "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
- "error: %d (%s)",
- eps->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), errno,
- strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
-}
-
-static void epoll_set_remove_fd(epoll_set *eps, grpc_fd *fd, bool is_fd_closed,
- grpc_error **error) {
- int err;
- char *err_msg;
- const char *err_desc = "epoll_set_remove_fd";
-
- /* If fd is already closed, then it would have been automatically been removed
- from the epoll set */
- if (!is_fd_closed) {
- err = epoll_ctl(eps->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
- if (err < 0 && errno != ENOENT) {
- gpr_asprintf(
- &err_msg,
- "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
- eps->epoll_fd, fd->fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- gpr_free(err_msg);
- }
- }
-}
-
-/* Might return NULL in case of an error */
-static epoll_set *epoll_set_create(grpc_error **error) {
- epoll_set *eps = NULL;
- const char *err_desc = "epoll_set_create";
-
- *error = GRPC_ERROR_NONE;
-
- eps = gpr_malloc(sizeof(*eps));
- eps->epoll_fd = -1;
-
- gpr_mu_init(&eps->mu);
-
- gpr_atm_rel_store(&eps->ref_count, 0);
- gpr_atm_rel_store(&eps->poller_count, 0);
-
- gpr_atm_rel_store(&eps->is_shutdown, false);
-
- eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
-
- if (eps->epoll_fd < 0) {
- append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
- goto done;
- }
-
-done:
- if (*error != GRPC_ERROR_NONE) {
- epoll_set_delete(eps);
- eps = NULL;
- }
- return eps;
-}
-
-static void epoll_set_delete(epoll_set *eps) {
- if (eps->epoll_fd >= 0) {
- close(eps->epoll_fd);
- }
-
- gpr_mu_destroy(&eps->mu);
-
- gpr_free(eps);
-}
-
-static grpc_error *epoll_set_global_init() {
- grpc_error *error = GRPC_ERROR_NONE;
-
- error = grpc_wakeup_fd_init(&epoll_set_wakeup_fd);
- if (error == GRPC_ERROR_NONE) {
- error = grpc_wakeup_fd_wakeup(&epoll_set_wakeup_fd);
- }
-
- return error;
-}
-
-static void epoll_set_global_shutdown() {
- grpc_wakeup_fd_destroy(&epoll_set_wakeup_fd);
-}
-
-/*******************************************************************************
- * Fd Definitions
- */
-
-/* We need to keep a freelist not because of any concerns of malloc performance
- * but instead so that implementations with multiple threads in (for example)
- * epoll_wait deal with the race between pollset removal and incoming poll
- * notifications.
- *
- * The problem is that the poller ultimately holds a reference to this
- * object, so it is very difficult to know when is safe to free it, at least
- * without some expensive synchronization.
- *
- * If we keep the object freelisted, in the worst case losing this race just
- * becomes a spurious read notification on a reused fd.
- */
-
-static grpc_fd *fd_freelist = NULL;
-static gpr_mu fd_freelist_mu;
-
-static grpc_fd *get_fd_from_freelist() {
- grpc_fd *new_fd = NULL;
-
- gpr_mu_lock(&fd_freelist_mu);
- if (fd_freelist != NULL) {
- new_fd = fd_freelist;
- fd_freelist = fd_freelist->freelist_next;
- }
- gpr_mu_unlock(&fd_freelist_mu);
- return new_fd;
-}
-
-static void add_fd_to_freelist(grpc_fd *fd) {
- gpr_mu_lock(&fd_freelist_mu);
- fd->freelist_next = fd_freelist;
- fd_freelist = fd;
- grpc_iomgr_unregister_object(&fd->iomgr_object);
-
- grpc_lfev_destroy(&fd->read_closure);
- grpc_lfev_destroy(&fd->write_closure);
-
- gpr_mu_unlock(&fd_freelist_mu);
-}
-
-static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
-
-static void fd_global_shutdown(void) {
- gpr_mu_lock(&fd_freelist_mu);
- gpr_mu_unlock(&fd_freelist_mu);
- while (fd_freelist != NULL) {
- grpc_fd *fd = fd_freelist;
- fd_freelist = fd_freelist->freelist_next;
- gpr_mu_destroy(&fd->mu);
- gpr_free(fd);
- }
- gpr_mu_destroy(&fd_freelist_mu);
-}
-
-static grpc_fd *fd_create(int fd, const char *name) {
- grpc_fd *new_fd = get_fd_from_freelist();
- if (new_fd == NULL) {
- new_fd = gpr_malloc(sizeof(grpc_fd));
- gpr_mu_init(&new_fd->mu);
- }
-
- /* Note: It is not really needed to get the new_fd->mu lock here. If this
- * is a newly created fd (or an fd we got from the freelist), no one else
- * would be holding a lock to it anyway. */
- gpr_mu_lock(&new_fd->mu);
- new_fd->eps = NULL;
-
- new_fd->fd = fd;
- new_fd->orphaned = false;
- grpc_lfev_init(&new_fd->read_closure);
- grpc_lfev_init(&new_fd->write_closure);
-
- new_fd->freelist_next = NULL;
- new_fd->on_done_closure = NULL;
-
- gpr_mu_unlock(&new_fd->mu);
-
- char *fd_name;
- gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
- grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
- gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, (void *)new_fd, fd_name);
- gpr_free(fd_name);
-
- /* Associate the fd with one of the eps */
- add_fd_to_eps(new_fd);
- return new_fd;
-}
-
-static int fd_wrapped_fd(grpc_fd *fd) {
- int ret_fd = -1;
- gpr_mu_lock(&fd->mu);
- if (!fd->orphaned) {
- ret_fd = fd->fd;
- }
- gpr_mu_unlock(&fd->mu);
-
- return ret_fd;
-}
-
-static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure *on_done, int *release_fd,
- bool already_closed, const char *reason) {
- bool is_fd_closed = already_closed;
- grpc_error *error = GRPC_ERROR_NONE;
- epoll_set *unref_eps = NULL;
-
- gpr_mu_lock(&fd->mu);
- fd->on_done_closure = on_done;
-
- /* If release_fd is not NULL, we should be relinquishing control of the file
- descriptor fd->fd (but we still own the grpc_fd structure). */
- if (release_fd != NULL) {
- *release_fd = fd->fd;
- } else if (!is_fd_closed) {
- close(fd->fd);
- is_fd_closed = true;
- }
-
- fd->orphaned = true;
-
- /* Remove the fd from the epoll set */
- if (fd->eps != NULL) {
- epoll_set_remove_fd(fd->eps, fd, is_fd_closed, &error);
- unref_eps = fd->eps;
- fd->eps = NULL;
- }
-
- GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
-
- gpr_mu_unlock(&fd->mu);
-
- /* We are done with this fd. Release it (i.e add back to freelist) */
- add_fd_to_freelist(fd);
-
- if (unref_eps != NULL) {
- /* Unref stale epoll set here, outside the fd lock above.
- The epoll set owns a workqueue which owns an fd, and unreffing
- inside the lock can cause an eventual lock loop that makes TSAN very
- unhappy. */
- EPS_UNREF(exec_ctx, unref_eps, "fd_orphan");
- }
- GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
- GRPC_ERROR_UNREF(error);
-}
-
-/* This polling engine doesn't really need the read notifier functionality. So
- * it just returns a dummy read notifier pollset */
-static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx,
- grpc_fd *fd) {
- return &g_read_notifier;
-}
-
-static bool fd_is_shutdown(grpc_fd *fd) {
- return grpc_lfev_is_shutdown(&fd->read_closure);
-}
-
-/* Might be called multiple times */
-static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
- if (grpc_lfev_set_shutdown(exec_ctx, &fd->read_closure,
- GRPC_ERROR_REF(why))) {
- shutdown(fd->fd, SHUT_RDWR);
- grpc_lfev_set_shutdown(exec_ctx, &fd->write_closure, GRPC_ERROR_REF(why));
- }
- GRPC_ERROR_UNREF(why);
-}
-
-static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
-}
-
-static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
- grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
-}
-
-/*******************************************************************************
- * Pollset Definitions
- */
-/* TODO: sreek - Not needed anymore */
-GPR_TLS_DECL(g_current_thread_pollset);
-GPR_TLS_DECL(g_current_thread_worker);
-
-static void pollset_worker_init(grpc_pollset_worker *worker) {
- worker->next = worker->prev = NULL;
- gpr_cv_init(&worker->kick_cv);
-}
-
-/* Global state management */
-static grpc_error *pollset_global_init(void) {
- gpr_tls_init(&g_current_thread_pollset);
- gpr_tls_init(&g_current_thread_worker);
- return GRPC_ERROR_NONE;
-}
-
-static void pollset_global_shutdown(void) {
- gpr_tls_destroy(&g_current_thread_pollset);
- gpr_tls_destroy(&g_current_thread_worker);
-}
-
-static grpc_error *pollset_worker_kick(grpc_pollset_worker *worker) {
- gpr_cv_signal(&worker->kick_cv);
- return GRPC_ERROR_NONE;
-}
-
-/* Return 1 if the pollset has active threads in pollset_work (pollset must
- * be locked) */
-static int pollset_has_workers(grpc_pollset *p) {
- return p->root_worker.next != &p->root_worker;
-}
-
-static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
- worker->prev->next = worker->next;
- worker->next->prev = worker->prev;
-}
-
-static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
- if (pollset_has_workers(p)) {
- grpc_pollset_worker *w = p->root_worker.next;
- remove_worker(p, w);
- return w;
- } else {
- return NULL;
- }
-}
-
-static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
- worker->next = &p->root_worker;
- worker->prev = worker->next->prev;
- worker->prev->next = worker->next->prev = worker;
-}
-
-static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
- worker->prev = &p->root_worker;
- worker->next = worker->prev->next;
- worker->prev->next = worker->next->prev = worker;
-}
-
-/* p->mu must be held before calling this function */
-static grpc_error *pollset_kick(grpc_exec_ctx *exec_ctx, grpc_pollset *p,
- grpc_pollset_worker *specific_worker) {
- GPR_TIMER_BEGIN("pollset_kick", 0);
- GRPC_STATS_INC_POLLSET_KICK(exec_ctx);
- grpc_error *error = GRPC_ERROR_NONE;
- const char *err_desc = "Kick Failure";
- grpc_pollset_worker *worker = specific_worker;
- if (worker != NULL) {
- if (worker == GRPC_POLLSET_KICK_BROADCAST) {
- if (pollset_has_workers(p)) {
- GPR_TIMER_BEGIN("pollset_kick.broadcast", 0);
- for (worker = p->root_worker.next; worker != &p->root_worker;
- worker = worker->next) {
- if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
- append_error(&error, pollset_worker_kick(worker), err_desc);
- }
- }
- GPR_TIMER_END("pollset_kick.broadcast", 0);
- } else {
- p->kicked_without_pollers = true;
- }
- } else {
- GPR_TIMER_MARK("kicked_specifically", 0);
- if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
- append_error(&error, pollset_worker_kick(worker), err_desc);
- }
- }
- } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
- /* Since worker == NULL, it means that we can kick "any" worker on this
- pollset 'p'. If 'p' happens to be the same pollset this thread is
- currently polling (i.e in pollset_work() function), then there is no need
- to kick any other worker since the current thread can just absorb the
- kick. This is the reason why we enter this case only when
- g_current_thread_pollset is != p */
-
- GPR_TIMER_MARK("kick_anonymous", 0);
- worker = pop_front_worker(p);
- if (worker != NULL) {
- GPR_TIMER_MARK("finally_kick", 0);
- push_back_worker(p, worker);
- append_error(&error, pollset_worker_kick(worker), err_desc);
- } else {
- GPR_TIMER_MARK("kicked_no_pollers", 0);
- p->kicked_without_pollers = true;
- }
- }
-
- GPR_TIMER_END("pollset_kick", 0);
- GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
- return error;
-}
-
-static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
- gpr_mu_init(&pollset->mu);
- *mu = &pollset->mu;
- pollset->eps = NULL;
-
- pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
- pollset->kicked_without_pollers = false;
-
- pollset->shutting_down = false;
- pollset->finish_shutdown_called = false;
- pollset->shutdown_done = NULL;
-}
-
-static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
-}
-
-static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
-}
-
-static void pollset_release_epoll_set(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
- char *reason) {
- if (ps->eps != NULL) {
- EPS_UNREF(exec_ctx, ps->eps, reason);
- }
- ps->eps = NULL;
-}
-
-static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
- grpc_pollset *pollset) {
- /* The pollset cannot have any workers if we are at this stage */
- GPR_ASSERT(!pollset_has_workers(pollset));
-
- pollset->finish_shutdown_called = true;
-
- /* Release the ref and set pollset->eps to NULL */
- pollset_release_epoll_set(exec_ctx, pollset, "ps_shutdown");
- GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
-}
-
-/* pollset->mu lock must be held by the caller before calling this */
-static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_closure *closure) {
- GPR_TIMER_BEGIN("pollset_shutdown", 0);
- GPR_ASSERT(!pollset->shutting_down);
- pollset->shutting_down = true;
- pollset->shutdown_done = closure;
- pollset_kick(exec_ctx, pollset, GRPC_POLLSET_KICK_BROADCAST);
-
- /* If the pollset has any workers, we cannot call finish_shutdown_locked()
- because it would release the underlying epoll set. In such a case, we
- let the last worker call finish_shutdown_locked() from pollset_work() */
- if (!pollset_has_workers(pollset)) {
- GPR_ASSERT(!pollset->finish_shutdown_called);
- GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
- finish_shutdown_locked(exec_ctx, pollset);
- }
- GPR_TIMER_END("pollset_shutdown", 0);
-}
-
-/* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
- * than destroying the mutexes, there is nothing special that needs to be done
- * here */
-static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
- GPR_ASSERT(!pollset_has_workers(pollset));
- gpr_mu_destroy(&pollset->mu);
-}
-
-/* Blocking call */
-static void acquire_epoll_lease(epoll_set *eps) {
- if (g_num_threads_per_eps > 1) {
- gpr_mu_lock(&eps->mu);
- }
-}
-
-static void release_epoll_lease(epoll_set *eps) {
- if (g_num_threads_per_eps > 1) {
- gpr_mu_unlock(&eps->mu);
- }
-}
-
-#define GRPC_EPOLL_MAX_EVENTS 100
-static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_set *eps,
- grpc_error **error) {
- struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
- int ep_rv;
- char *err_msg;
- const char *err_desc = "do_epoll_wait";
-
- int timeout_ms = -1;
-
- GRPC_SCHEDULING_START_BLOCKING_REGION;
- acquire_epoll_lease(eps);
- GRPC_STATS_INC_SYSCALL_POLL(exec_ctx);
- ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms);
- release_epoll_lease(eps);
- GRPC_SCHEDULING_END_BLOCKING_REGION;
-
- if (ep_rv < 0) {
- gpr_asprintf(&err_msg,
- "epoll_wait() epoll fd: %d failed with error: %d (%s)",
- epoll_fd, errno, strerror(errno));
- append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
- }
-
-#ifdef GRPC_TSAN
- /* See the definition of g_poll_sync for more details */
- gpr_atm_acq_load(&g_epoll_sync);
-#endif /* defined(GRPC_TSAN) */
-
- for (int i = 0; i < ep_rv; ++i) {
- void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &epoll_set_wakeup_fd) {
- gpr_atm_rel_store(&eps->is_shutdown, 1);
- gpr_log(GPR_INFO, "pollset poller: shutdown set");
- } else {
- grpc_fd *fd = data_ptr;
- int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
- int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
- int write_ev = ep_ev[i].events & EPOLLOUT;
- if (read_ev || cancel) {
- fd_become_readable(exec_ctx, fd);
- }
- if (write_ev || cancel) {
- fd_become_writable(exec_ctx, fd);
- }
- }
- }
-}
-
-static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps,
- grpc_error **error) {
- int epoll_fd = -1;
- GPR_TIMER_BEGIN("epoll_set_work", 0);
-
- /* Since epoll_fd is immutable, it is safe to read it without a lock on the
- epoll set. */
- epoll_fd = eps->epoll_fd;
-
- gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
- g_current_thread_epoll_set = eps;
-
- do_epoll_wait(exec_ctx, epoll_fd, eps, error);
-
- g_current_thread_epoll_set = NULL;
- gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1);
-
- GPR_TIMER_END("epoll_set_work", 0);
-}
-
-/* pollset->mu lock must be held by the caller before calling this.
- The function pollset_work() may temporarily release the lock (pollset->mu)
- during the course of its execution but it will always re-acquire the lock and
- ensure that it is held by the time the function returns */
-static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_pollset_worker **worker_hdl,
- gpr_timespec now, gpr_timespec deadline) {
- GPR_TIMER_BEGIN("pollset_work", 0);
- grpc_error *error = GRPC_ERROR_NONE;
-
- grpc_pollset_worker worker;
- pollset_worker_init(&worker);
-
- if (worker_hdl) *worker_hdl = &worker;
-
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
- gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
-
- if (pollset->kicked_without_pollers) {
- /* If the pollset was kicked without pollers, pretend that the current
- worker got the kick and skip polling. A kick indicates that there is some
- work that needs attention like an event on the completion queue or an
- alarm */
- GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
- pollset->kicked_without_pollers = 0;
- } else if (!pollset->shutting_down) {
- push_front_worker(pollset, &worker);
-
- gpr_cv_wait(&worker.kick_cv, &pollset->mu,
- gpr_convert_clock_type(deadline, GPR_CLOCK_REALTIME));
- /* pollset->mu locked here */
-
- remove_worker(pollset, &worker);
- }
-
- /* If we are the last worker on the pollset (i.e pollset_has_workers() is
- false at this point) and the pollset is shutting down, we may have to
- finish the shutdown process by calling finish_shutdown_locked().
- See pollset_shutdown() for more details.
-
- Note: Continuing to access pollset here is safe; it is the caller's
- responsibility to not destroy a pollset when it has outstanding calls to
- pollset_work() */
- if (pollset->shutting_down && !pollset_has_workers(pollset) &&
- !pollset->finish_shutdown_called) {
- GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
- finish_shutdown_locked(exec_ctx, pollset);
-
- gpr_mu_unlock(&pollset->mu);
- grpc_exec_ctx_flush(exec_ctx);
- gpr_mu_lock(&pollset->mu);
- }
-
- if (worker_hdl) *worker_hdl = NULL;
-
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
- gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
-
- GPR_TIMER_END("pollset_work", 0);
-
- GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
- return error;
-}
-
-static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
- grpc_fd *fd) {
- /* Nothing to do */
-}
-
-/*******************************************************************************
- * Pollset-set Definitions
- */
-grpc_pollset_set g_dummy_pollset_set;
-static grpc_pollset_set *pollset_set_create(void) {
- return &g_dummy_pollset_set;
-}
-
-static void pollset_set_destroy(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss) {
- /* Nothing to do */
-}
-
-static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
- grpc_fd *fd) {
- /* Nothing to do */
-}
-
-static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss,
- grpc_fd *fd) {
- /* Nothing to do */
-}
-
-static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {
- /* Nothing to do */
-}
-
-static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *pss, grpc_pollset *ps) {
- /* Nothing to do */
-}
-
-static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {
- /* Nothing to do */
-}
-
-static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
- grpc_pollset_set *bag,
- grpc_pollset_set *item) {
- /* Nothing to do */
-}
-
-/*******************************************************************************
- * Event engine binding
- */
-
-static void shutdown_engine(void) {
- shutdown_poller_threads();
- shutdown_epoll_sets();
- fd_global_shutdown();
- pollset_global_shutdown();
- epoll_set_global_shutdown();
- gpr_log(GPR_INFO, "ev-epoll-threadpool engine shutdown complete");
-}
-
-static const grpc_event_engine_vtable vtable = {
- .pollset_size = sizeof(grpc_pollset),
-
- .fd_create = fd_create,
- .fd_wrapped_fd = fd_wrapped_fd,
- .fd_orphan = fd_orphan,
- .fd_shutdown = fd_shutdown,
- .fd_is_shutdown = fd_is_shutdown,
- .fd_notify_on_read = fd_notify_on_read,
- .fd_notify_on_write = fd_notify_on_write,
- .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
-
- .pollset_init = pollset_init,
- .pollset_shutdown = pollset_shutdown,
- .pollset_destroy = pollset_destroy,
- .pollset_work = pollset_work,
- .pollset_kick = pollset_kick,
- .pollset_add_fd = pollset_add_fd,
-
- .pollset_set_create = pollset_set_create,
- .pollset_set_destroy = pollset_set_destroy,
- .pollset_set_add_pollset = pollset_set_add_pollset,
- .pollset_set_del_pollset = pollset_set_del_pollset,
- .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
- .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
- .pollset_set_add_fd = pollset_set_add_fd,
- .pollset_set_del_fd = pollset_set_del_fd,
-
- .shutdown_engine = shutdown_engine,
-};
-
-/*****************************************************************************
- * Dedicated polling threads and pollsets - Definitions
- */
-static void add_fd_to_eps(grpc_fd *fd) {
- GPR_ASSERT(fd->eps == NULL);
- GPR_TIMER_BEGIN("add_fd_to_eps", 0);
-
- grpc_error *error = GRPC_ERROR_NONE;
- size_t idx = (size_t)gpr_atm_no_barrier_fetch_add(&g_next_eps, 1) % g_num_eps;
- epoll_set *eps = g_epoll_sets[idx];
-
- gpr_mu_lock(&fd->mu);
-
- if (fd->orphaned) {
- gpr_mu_unlock(&fd->mu);
- return; /* Early out */
- }
-
- epoll_set_add_fd_locked(eps, fd, &error);
- EPS_ADD_REF(eps, "fd");
- fd->eps = eps;
-
- GRPC_POLLING_TRACE("add_fd_to_eps (fd: %d, eps idx = %" PRIdPTR ")", fd->fd,
- idx);
- gpr_mu_unlock(&fd->mu);
-
- GRPC_LOG_IF_ERROR("add_fd_to_eps", error);
- GPR_TIMER_END("add_fd_to_eps", 0);
-}
-
-static bool init_epoll_sets() {
- grpc_error *error = GRPC_ERROR_NONE;
- bool is_success = true;
-
- g_epoll_sets = (epoll_set **)malloc(g_num_eps * sizeof(epoll_set *));
-
- for (size_t i = 0; i < g_num_eps; i++) {
- g_epoll_sets[i] = epoll_set_create(&error);
- if (g_epoll_sets[i] == NULL) {
- gpr_log(GPR_ERROR, "Error in creating a epoll set");
- g_num_eps = i; /* Helps cleanup */
- shutdown_epoll_sets();
- is_success = false;
- goto done;
- }
-
- EPS_ADD_REF(g_epoll_sets[i], "init_epoll_sets");
- }
-
- gpr_atm_no_barrier_store(&g_next_eps, 0);
- gpr_mu *mu;
- pollset_init(&g_read_notifier, &mu);
-
-done:
- GRPC_LOG_IF_ERROR("init_epoll_sets", error);
- return is_success;
-}
-
-static void shutdown_epoll_sets() {
- if (!g_epoll_sets) {
- return;
- }
-
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- for (size_t i = 0; i < g_num_eps; i++) {
- EPS_UNREF(&exec_ctx, g_epoll_sets[i], "shutdown_epoll_sets");
- }
- grpc_exec_ctx_flush(&exec_ctx);
-
- gpr_free(g_epoll_sets);
- g_epoll_sets = NULL;
- pollset_destroy(&exec_ctx, &g_read_notifier);
- grpc_exec_ctx_finish(&exec_ctx);
-}
-
-static void poller_thread_loop(void *arg) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_error *error = GRPC_ERROR_NONE;
- epoll_set *eps = (epoll_set *)arg;
-
- while (!gpr_atm_acq_load(&eps->is_shutdown)) {
- epoll_set_work(&exec_ctx, eps, &error);
- grpc_exec_ctx_flush(&exec_ctx);
- }
-
- grpc_exec_ctx_finish(&exec_ctx);
- GRPC_LOG_IF_ERROR("poller_thread_loop", error);
-}
-
-/* g_epoll_sets MUST be initialized before calling this */
-static void start_poller_threads() {
- GPR_ASSERT(g_epoll_sets);
-
- gpr_log(GPR_INFO, "Starting poller threads");
-
- size_t num_threads = g_num_eps * g_num_threads_per_eps;
- g_poller_threads = (gpr_thd_id *)malloc(num_threads * sizeof(gpr_thd_id));
- gpr_thd_options options = gpr_thd_options_default();
- gpr_thd_options_set_joinable(&options);
-
- for (size_t i = 0; i < num_threads; i++) {
- gpr_thd_new(&g_poller_threads[i], poller_thread_loop,
- (void *)g_epoll_sets[i % g_num_eps], &options);
- }
-}
-
-static void shutdown_poller_threads() {
- GPR_ASSERT(g_poller_threads);
- GPR_ASSERT(g_epoll_sets);
- grpc_error *error = GRPC_ERROR_NONE;
-
- gpr_log(GPR_INFO, "Shutting down pollers");
-
- epoll_set *eps = NULL;
- size_t num_threads = g_num_eps * g_num_threads_per_eps;
- for (size_t i = 0; i < num_threads; i++) {
- eps = g_epoll_sets[i];
- epoll_set_add_wakeup_fd_locked(eps, &epoll_set_wakeup_fd, &error);
- }
-
- for (size_t i = 0; i < g_num_eps; i++) {
- gpr_thd_join(g_poller_threads[i]);
- }
-
- GRPC_LOG_IF_ERROR("shutdown_poller_threads", error);
- gpr_free(g_poller_threads);
- g_poller_threads = NULL;
-}
-
-/****************************************************************************/
-
-/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
- * Create a dummy epoll_fd to make sure epoll support is available */
-static bool is_epoll_available() {
- int fd = epoll_create1(EPOLL_CLOEXEC);
- if (fd < 0) {
- gpr_log(
- GPR_ERROR,
- "epoll_create1 failed with error: %d. Not using epoll polling engine",
- fd);
- return false;
- }
- close(fd);
- return true;
-}
-
-const grpc_event_engine_vtable *grpc_init_epoll_thread_pool_linux(
- bool requested_explicitly) {
- if (!requested_explicitly) return NULL;
-
- if (!grpc_has_wakeup_fd()) {
- return NULL;
- }
-
- if (!is_epoll_available()) {
- return NULL;
- }
-
- fd_global_init();
-
- if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
- return NULL;
- }
-
- if (!GRPC_LOG_IF_ERROR("epoll_set_global_init", epoll_set_global_init())) {
- return NULL;
- }
-
- if (!init_epoll_sets()) {
- return NULL;
- }
-
- /* TODO (sreek): Maynot be a good idea to start threads here (especially if
- * this engine doesn't get picked. Consider introducing an engine_init
- * function in the vtable */
- start_poller_threads();
- return &vtable;
-}
-
-#else /* defined(GRPC_LINUX_EPOLL) */
-#if defined(GRPC_POSIX_SOCKET)
-#include "src/core/lib/iomgr/ev_posix.h"
-/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
- * NULL */
-const grpc_event_engine_vtable *grpc_init_epoll_thread_pool_linux(
- bool requested_explicitly) {
- return NULL;
-}
-#endif /* defined(GRPC_POSIX_SOCKET) */
-#endif /* !defined(GRPC_LINUX_EPOLL) */
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.h b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.h
deleted file mode 100644
index 2ca68cc9d1..0000000000
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- * Copyright 2017 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#ifndef GRPC_CORE_LIB_IOMGR_EV_EPOLL_THREAD_POOL_LINUX_H
-#define GRPC_CORE_LIB_IOMGR_EV_EPOLL_THREAD_POOL_LINUX_H
-
-#include "src/core/lib/iomgr/ev_posix.h"
-#include "src/core/lib/iomgr/port.h"
-
-const grpc_event_engine_vtable *grpc_init_epoll_thread_pool_linux(
- bool requested_explicitly);
-
-#endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL_THREAD_POOL_LINUX_H */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 32363765e3..2b0d01f8cd 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -31,8 +31,6 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_epoll1_linux.h"
-#include "src/core/lib/iomgr/ev_epoll_limited_pollers_linux.h"
-#include "src/core/lib/iomgr/ev_epoll_thread_pool_linux.h"
#include "src/core/lib/iomgr/ev_epollex_linux.h"
#include "src/core/lib/iomgr/ev_epollsig_linux.h"
#include "src/core/lib/iomgr/ev_poll_posix.h"
@@ -66,8 +64,6 @@ typedef struct {
static const event_engine_factory g_factories[] = {
{"epoll1", grpc_init_epoll1_linux},
{"epollsig", grpc_init_epollsig_linux},
- {"epoll-threadpool", grpc_init_epoll_thread_pool_linux},
- {"epoll-limited", grpc_init_epoll_limited_pollers_linux},
{"poll", grpc_init_poll_posix},
{"poll-cv", grpc_init_poll_cv_posix},
{"epollex", grpc_init_epollex_linux},
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 3d19953eeb..1feea6d628 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -164,13 +164,7 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
bool grpc_iomgr_abort_on_leaks(void) {
char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS");
- if (env == NULL) return false;
- static const char *truthy[] = {"yes", "Yes", "YES", "true",
- "True", "TRUE", "1"};
- bool should_we = false;
- for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
- if (0 == strcmp(env, truthy[i])) should_we = true;
- }
+ bool should_we = gpr_is_true(env);
gpr_free(env);
return should_we;
}
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 793147f30a..98a2afd78b 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -250,7 +250,7 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
- msg.msg_iovlen = tcp->incoming_buffer->count;
+ msg.msg_iovlen = (msg_iovlen_type)tcp->incoming_buffer->count;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 531a88434f..dd7dd44e79 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -39,6 +39,8 @@
/* We can have a per-call credentials. */
typedef struct {
+ grpc_call_stack *owning_call;
+ grpc_call_combiner *call_combiner;
grpc_call_credentials *creds;
bool have_host;
bool have_method;
@@ -49,17 +51,12 @@ typedef struct {
pollset_set so that work can progress when this call wants work to progress
*/
grpc_polling_entity *pollent;
- gpr_atm security_context_set;
- gpr_mu security_context_mu;
grpc_credentials_mdelem_array md_array;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
grpc_auth_metadata_context auth_md_context;
- grpc_closure closure;
- // Either 0 (no cancellation and no async operation in flight),
- // a grpc_closure* (if the lowest bit is 0),
- // or a grpc_error* (if the lowest bit is 1).
- gpr_atm cancellation_state;
- grpc_closure cancel_closure;
+ grpc_closure async_result_closure;
+ grpc_closure check_call_host_cancel_closure;
+ grpc_closure get_request_metadata_cancel_closure;
} call_data;
/* We can have a per-channel credentials. */
@@ -68,43 +65,6 @@ typedef struct {
grpc_auth_context *auth_context;
} channel_data;
-static void decode_cancel_state(gpr_atm cancel_state, grpc_closure **func,
- grpc_error **error) {
- // If the lowest bit is 1, the value is a grpc_error*.
- // Otherwise, if non-zdero, the value is a grpc_closure*.
- if (cancel_state & 1) {
- *error = (grpc_error *)(cancel_state & ~(gpr_atm)1);
- } else if (cancel_state != 0) {
- *func = (grpc_closure *)cancel_state;
- }
-}
-
-static gpr_atm encode_cancel_state_error(grpc_error *error) {
- // Set the lowest bit to 1 to indicate that it's an error.
- return (gpr_atm)1 | (gpr_atm)error;
-}
-
-// Returns an error if the call has been cancelled. Otherwise, sets the
-// cancellation function to be called upon cancellation.
-static grpc_error *set_cancel_func(grpc_call_element *elem,
- grpc_iomgr_cb_func func) {
- call_data *calld = (call_data *)elem->call_data;
- // Decode original state.
- gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
- grpc_error *original_error = GRPC_ERROR_NONE;
- grpc_closure *original_func = NULL;
- decode_cancel_state(original_state, &original_func, &original_error);
- // If error is set, return it.
- if (original_error != GRPC_ERROR_NONE) return GRPC_ERROR_REF(original_error);
- // Otherwise, store func.
- GRPC_CLOSURE_INIT(&calld->cancel_closure, func, elem,
- grpc_schedule_on_exec_ctx);
- GPR_ASSERT(((gpr_atm)&calld->cancel_closure & (gpr_atm)1) == 0);
- gpr_atm_rel_store(&calld->cancellation_state,
- (gpr_atm)&calld->cancel_closure);
- return GRPC_ERROR_NONE;
-}
-
static void reset_auth_metadata_context(
grpc_auth_metadata_context *auth_md_context) {
if (auth_md_context->service_url != NULL) {
@@ -153,7 +113,8 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg,
} else {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAUTHENTICATED);
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error,
+ calld->call_combiner);
}
}
@@ -191,8 +152,12 @@ static void cancel_get_request_metadata(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
- grpc_call_credentials_cancel_get_request_metadata(
- exec_ctx, calld->creds, &calld->md_array, GRPC_ERROR_REF(error));
+ if (error != GRPC_ERROR_NONE) {
+ grpc_call_credentials_cancel_get_request_metadata(
+ exec_ctx, calld->creds, &calld->md_array, GRPC_ERROR_REF(error));
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
+ "cancel_get_request_metadata");
}
static void send_security_metadata(grpc_exec_ctx *exec_ctx,
@@ -223,7 +188,8 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Incompatible credentials set on channel and call."),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED));
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED),
+ calld->call_combiner);
return;
}
} else {
@@ -234,22 +200,25 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
- grpc_error *cancel_error = set_cancel_func(elem, cancel_get_request_metadata);
- if (cancel_error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
- cancel_error);
- return;
- }
GPR_ASSERT(calld->pollent != NULL);
- GRPC_CLOSURE_INIT(&calld->closure, on_credentials_metadata, batch,
- grpc_schedule_on_exec_ctx);
+
+ GRPC_CLOSURE_INIT(&calld->async_result_closure, on_credentials_metadata,
+ batch, grpc_schedule_on_exec_ctx);
grpc_error *error = GRPC_ERROR_NONE;
if (grpc_call_credentials_get_request_metadata(
exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
- &calld->md_array, &calld->closure, &error)) {
+ &calld->md_array, &calld->async_result_closure, &error)) {
// Synchronous return; invoke on_credentials_metadata() directly.
on_credentials_metadata(exec_ctx, batch, error);
GRPC_ERROR_UNREF(error);
+ } else {
+ // Async return; register cancellation closure with call combiner.
+ GRPC_CALL_STACK_REF(calld->owning_call, "cancel_get_request_metadata");
+ grpc_call_combiner_set_notify_on_cancel(
+ exec_ctx, calld->call_combiner,
+ GRPC_CLOSURE_INIT(&calld->get_request_metadata_cancel_closure,
+ cancel_get_request_metadata, elem,
+ grpc_schedule_on_exec_ctx));
}
}
@@ -258,7 +227,6 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
-
if (error == GRPC_ERROR_NONE) {
send_security_metadata(exec_ctx, elem, batch);
} else {
@@ -271,7 +239,8 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
exec_ctx, batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg),
GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAUTHENTICATED));
+ GRPC_STATUS_UNAUTHENTICATED),
+ calld->call_combiner);
gpr_free(error_msg);
}
}
@@ -281,9 +250,12 @@ static void cancel_check_call_host(grpc_exec_ctx *exec_ctx, void *arg,
grpc_call_element *elem = (grpc_call_element *)arg;
call_data *calld = (call_data *)elem->call_data;
channel_data *chand = (channel_data *)elem->channel_data;
- grpc_channel_security_connector_cancel_check_call_host(
- exec_ctx, chand->security_connector, &calld->closure,
- GRPC_ERROR_REF(error));
+ if (error != GRPC_ERROR_NONE) {
+ grpc_channel_security_connector_cancel_check_call_host(
+ exec_ctx, chand->security_connector, &calld->async_result_closure,
+ GRPC_ERROR_REF(error));
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "cancel_check_call_host");
}
static void auth_start_transport_stream_op_batch(
@@ -295,52 +267,19 @@ static void auth_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (batch->cancel_stream) {
- while (true) {
- // Decode the original cancellation state.
- gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
- grpc_error *cancel_error = GRPC_ERROR_NONE;
- grpc_closure *func = NULL;
- decode_cancel_state(original_state, &func, &cancel_error);
- // If we had already set a cancellation error, there's nothing
- // more to do.
- if (cancel_error != GRPC_ERROR_NONE) break;
- // If there's a cancel func, call it.
- // Note that even if the cancel func has been changed by some
- // other thread between when we decoded it and now, it will just
- // be a no-op.
- cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
- if (func != NULL) {
- GRPC_CLOSURE_SCHED(exec_ctx, func, GRPC_ERROR_REF(cancel_error));
- }
- // Encode the new error into cancellation state.
- if (gpr_atm_full_cas(&calld->cancellation_state, original_state,
- encode_cancel_state_error(cancel_error))) {
- break; // Success.
- }
- // The cas failed, so try again.
- }
- } else {
- /* double checked lock over security context to ensure it's set once */
- if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
- gpr_mu_lock(&calld->security_context_mu);
- if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
- GPR_ASSERT(batch->payload->context != NULL);
- if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
- batch->payload->context[GRPC_CONTEXT_SECURITY].value =
- grpc_client_security_context_create();
- batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
- grpc_client_security_context_destroy;
- }
- grpc_client_security_context *sec_ctx =
- batch->payload->context[GRPC_CONTEXT_SECURITY].value;
- GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
- sec_ctx->auth_context =
- GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
- gpr_atm_rel_store(&calld->security_context_set, 1);
- }
- gpr_mu_unlock(&calld->security_context_mu);
+ if (!batch->cancel_stream) {
+ GPR_ASSERT(batch->payload->context != NULL);
+ if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value =
+ grpc_client_security_context_create();
+ batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
+ grpc_client_security_context_destroy;
}
+ grpc_client_security_context *sec_ctx =
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value;
+ GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
+ sec_ctx->auth_context =
+ GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
}
if (batch->send_initial_metadata) {
@@ -365,26 +304,27 @@ static void auth_start_transport_stream_op_batch(
}
}
if (calld->have_host) {
- grpc_error *cancel_error = set_cancel_func(elem, cancel_check_call_host);
- if (cancel_error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
- cancel_error);
+ batch->handler_private.extra_arg = elem;
+ GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch,
+ grpc_schedule_on_exec_ctx);
+ char *call_host = grpc_slice_to_c_string(calld->host);
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_channel_security_connector_check_call_host(
+ exec_ctx, chand->security_connector, call_host,
+ chand->auth_context, &calld->async_result_closure, &error)) {
+ // Synchronous return; invoke on_host_checked() directly.
+ on_host_checked(exec_ctx, batch, error);
+ GRPC_ERROR_UNREF(error);
} else {
- char *call_host = grpc_slice_to_c_string(calld->host);
- batch->handler_private.extra_arg = elem;
- grpc_error *error = GRPC_ERROR_NONE;
- if (grpc_channel_security_connector_check_call_host(
- exec_ctx, chand->security_connector, call_host,
- chand->auth_context,
- GRPC_CLOSURE_INIT(&calld->closure, on_host_checked, batch,
- grpc_schedule_on_exec_ctx),
- &error)) {
- // Synchronous return; invoke on_host_checked() directly.
- on_host_checked(exec_ctx, batch, error);
- GRPC_ERROR_UNREF(error);
- }
- gpr_free(call_host);
+ // Async return; register cancellation closure with call combiner.
+ GRPC_CALL_STACK_REF(calld->owning_call, "cancel_check_call_host");
+ grpc_call_combiner_set_notify_on_cancel(
+ exec_ctx, calld->call_combiner,
+ GRPC_CLOSURE_INIT(&calld->check_call_host_cancel_closure,
+ cancel_check_call_host, elem,
+ grpc_schedule_on_exec_ctx));
}
+ gpr_free(call_host);
GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
return; /* early exit */
}
@@ -400,8 +340,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
- memset(calld, 0, sizeof(*calld));
- gpr_mu_init(&calld->security_context_mu);
+ calld->owning_call = args->call_stack;
+ calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -426,12 +366,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_slice_unref_internal(exec_ctx, calld->method);
}
reset_auth_metadata_context(&calld->auth_md_context);
- gpr_mu_destroy(&calld->security_context_mu);
- gpr_atm cancel_state = gpr_atm_acq_load(&calld->cancellation_state);
- grpc_error *cancel_error = GRPC_ERROR_NONE;
- grpc_closure *cancel_func = NULL;
- decode_cancel_state(cancel_state, &cancel_func, &cancel_error);
- GRPC_ERROR_UNREF(cancel_error);
}
/* Constructor for channel_data */
@@ -490,6 +424,5 @@ const grpc_channel_filter grpc_client_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"client-auth"};
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 9bf3f0ca0f..7f523c0883 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -26,7 +26,15 @@
#include "src/core/lib/security/transport/auth_filters.h"
#include "src/core/lib/slice/slice_internal.h"
+typedef enum {
+ STATE_INIT = 0,
+ STATE_DONE,
+ STATE_CANCELLED,
+} async_state;
+
typedef struct call_data {
+ grpc_call_combiner *call_combiner;
+ grpc_call_stack *owning_call;
grpc_transport_stream_op_batch *recv_initial_metadata_batch;
grpc_closure *original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
@@ -34,6 +42,8 @@ typedef struct call_data {
const grpc_metadata *consumed_md;
size_t num_consumed_md;
grpc_auth_context *auth_context;
+ grpc_closure cancel_closure;
+ gpr_atm state; // async_state
} call_data;
typedef struct channel_data {
@@ -78,54 +88,94 @@ static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx *exec_ctx,
return GRPC_FILTERED_MDELEM(md);
}
-/* called from application code */
-static void on_md_processing_done(
- void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
- const grpc_metadata *response_md, size_t num_response_md,
- grpc_status_code status, const char *error_details) {
- grpc_call_element *elem = user_data;
+static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ const grpc_metadata *consumed_md,
+ size_t num_consumed_md,
+ const grpc_metadata *response_md,
+ size_t num_response_md,
+ grpc_error *error) {
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
- grpc_error *error = GRPC_ERROR_NONE;
- if (status == GRPC_STATUS_OK) {
+ if (error == GRPC_ERROR_NONE) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
error = grpc_metadata_batch_filter(
- &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
+ exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
remove_consumed_md, elem, "Response metadata filtering error");
- } else {
- if (error_details == NULL) {
- error_details = "Authentication metadata processing failed.";
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
+}
+
+// Called from application code.
+static void on_md_processing_done(
+ void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
+ const grpc_metadata *response_md, size_t num_response_md,
+ grpc_status_code status, const char *error_details) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // If the call was not cancelled while we were in flight, process the result.
+ if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
+ (gpr_atm)STATE_DONE)) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (status != GRPC_STATUS_OK) {
+ if (error_details == NULL) {
+ error_details = "Authentication metadata processing failed.";
+ }
+ error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
}
- error =
- grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status);
+ on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md,
+ response_md, num_response_md, error);
}
+ // Clean up.
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
- GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready,
- error);
+ GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata");
grpc_exec_ctx_finish(&exec_ctx);
}
+static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = elem->call_data;
+ // If the result was not already processed, invoke the callback now.
+ if (error != GRPC_ERROR_NONE &&
+ gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
+ (gpr_atm)STATE_CANCELLED)) {
+ on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0,
+ GRPC_ERROR_REF(error));
+ }
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "cancel_call");
+}
+
static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_call_element *elem = arg;
+ grpc_call_element *elem = (grpc_call_element *)arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
if (error == GRPC_ERROR_NONE) {
if (chand->creds != NULL && chand->creds->processor.process != NULL) {
+ // We're calling out to the application, so we need to make sure
+ // to drop the call combiner early if we get cancelled.
+ GRPC_CALL_STACK_REF(calld->owning_call, "cancel_call");
+ GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
+ &calld->cancel_closure);
+ GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata");
calld->md = metadata_batch_to_md_array(
batch->payload->recv_initial_metadata.recv_initial_metadata);
chand->creds->processor.process(
@@ -159,6 +209,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
+ calld->call_combiner = args->call_combiner;
+ calld->owning_call = args->call_stack;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -218,6 +270,5 @@ const grpc_channel_filter grpc_server_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server-auth"};
diff --git a/src/core/lib/support/string.c b/src/core/lib/support/string.c
index b65009754a..ec93303024 100644
--- a/src/core/lib/support/string.c
+++ b/src/core/lib/support/string.c
@@ -298,3 +298,16 @@ void *gpr_memrchr(const void *s, int c, size_t n) {
}
return NULL;
}
+
+bool gpr_is_true(const char *s) {
+ if (s == NULL) {
+ return false;
+ }
+ static const char *truthy[] = {"yes", "true", "1"};
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) {
+ if (0 == gpr_stricmp(s, truthy[i])) {
+ return true;
+ }
+ }
+ return false;
+}
diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h
index e11df8439d..5a56fa3a0a 100644
--- a/src/core/lib/support/string.h
+++ b/src/core/lib/support/string.h
@@ -19,6 +19,7 @@
#ifndef GRPC_CORE_LIB_SUPPORT_STRING_H
#define GRPC_CORE_LIB_SUPPORT_STRING_H
+#include <stdbool.h>
#include <stddef.h>
#include <grpc/support/port_platform.h>
@@ -106,6 +107,8 @@ int gpr_stricmp(const char *a, const char *b);
void *gpr_memrchr(const void *s, int c, size_t n);
+/** Return true if lower(s) equals "true", "yes" or "1", otherwise false. */
+bool gpr_is_true(const char *s);
#ifdef __cplusplus
}
#endif
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 2599765510..347cb9a303 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -122,6 +122,7 @@ typedef struct batch_control {
bool is_closure;
} notify_tag;
} completion_data;
+ grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
@@ -151,6 +152,7 @@ typedef struct {
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
+ grpc_call_combiner call_combiner;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
@@ -184,6 +186,11 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
+ grpc_metadata compression_md;
+
+ // A char* indicating the peer name.
+ gpr_atm peer_string;
+
/* Packed received call statuses from various sources */
gpr_atm status[STATUS_SOURCE_COUNT];
@@ -262,8 +269,9 @@ grpc_tracer_flag grpc_compression_trace =
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op);
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op,
+ grpc_closure *start_batch_closure);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -329,6 +337,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
sizeof(grpc_call) + channel_stack->call_stack_size);
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
+ grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
call->channel = args->channel;
call->cq = args->cq;
@@ -437,7 +446,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
.path = path,
.start_time = call->start_time,
.deadline = send_deadline,
- .arena = call->arena};
+ .arena = call->arena,
+ .call_combiner = &call->call_combiner};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
@@ -504,6 +514,8 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
grpc_call *c = call;
grpc_channel *channel = c->channel;
+ grpc_call_combiner_destroy(&c->call_combiner);
+ gpr_free((char *)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
}
@@ -587,6 +599,12 @@ void grpc_call_unref(grpc_call *c) {
if (cancel) {
cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE,
GRPC_ERROR_CANCELLED);
+ } else {
+ // Unset the call combiner cancellation closure. This has the
+ // effect of scheduling the previously set cancellation closure, if
+ // any, so that it can release any internal references it may be
+ // holding to the call stack.
+ grpc_call_combiner_set_notify_on_cancel(&exec_ctx, &c->call_combiner, NULL);
}
GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
grpc_exec_ctx_finish(&exec_ctx);
@@ -603,30 +621,37 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
return GRPC_CALL_OK;
}
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op) {
- grpc_call_element *elem;
-
- GPR_TIMER_BEGIN("execute_op", 0);
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
- GPR_TIMER_END("execute_op", 0);
+// This is called via the call combiner to start sending a batch down
+// the filter stack.
+static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *ignored) {
+ grpc_transport_stream_op_batch *batch = arg;
+ grpc_call *call = batch->handler_private.extra_arg;
+ GPR_TIMER_BEGIN("execute_batch", 0);
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
+ GPR_TIMER_END("execute_batch", 0);
+}
+
+// start_batch_closure points to a caller-allocated closure to be used
+// for entering the call combiner.
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *batch,
+ grpc_closure *start_batch_closure) {
+ batch->handler_private.extra_arg = call;
+ GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
+ GRPC_ERROR_NONE, "executing batch");
}
char *grpc_call_get_peer(grpc_call *call) {
- grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- char *result;
- GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
- result = elem->filter->get_peer(&exec_ctx, elem);
- if (result == NULL) {
- result = grpc_channel_get_target(call->channel);
- }
- if (result == NULL) {
- result = gpr_strdup("unknown");
- }
- grpc_exec_ctx_finish(&exec_ctx);
- return result;
+ char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
+ if (peer_string != NULL) return gpr_strdup(peer_string);
+ peer_string = grpc_channel_get_target(call->channel);
+ if (peer_string != NULL) return peer_string;
+ return gpr_strdup("unknown");
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -653,20 +678,41 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return GRPC_CALL_OK;
}
-static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
+typedef struct {
+ grpc_call *call;
+ grpc_closure start_batch;
+ grpc_closure finish_batch;
+} cancel_state;
+
+// The on_complete callback used when sending a cancel_stream batch down
+// the filter stack. Yields the call combiner when the batch is done.
+static void done_termination(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
+ cancel_state *state = (cancel_state *)arg;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
+ "on_complete for cancel_stream op");
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
+ gpr_free(state);
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
+ // Inform the call combiner of the cancellation, so that it can cancel
+ // any in-flight asynchronous actions that may be holding the call
+ // combiner. This ensures that the cancel_stream batch can be sent
+ // down the filter stack in a timely manner.
+ grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
- GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx));
+ cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
+ state->call = c;
+ GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
+ grpc_schedule_on_exec_ctx);
+ grpc_transport_stream_op_batch *op =
+ grpc_make_transport_stream_op(&state->finish_batch);
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_op(exec_ctx, c, op);
+ execute_batch(exec_ctx, c, op, &state->start_batch);
}
static grpc_error *error_from_status(grpc_status_code status,
@@ -1432,6 +1478,18 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
+// The recv_message_ready callback used when sending a batch containing
+// a recv_message op down the filter stack. Yields the call combiner
+// before processing the received message.
+static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx,
+ void *bctlp,
+ grpc_error *error) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
+ receiving_stream_ready(exec_ctx, bctlp, error);
+}
+
static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
@@ -1538,6 +1596,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
+ "recv_initial_metadata_ready");
+
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1591,7 +1652,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
-
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
@@ -1611,9 +1673,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
- // sent_initial_metadata guards against variable reuse.
- grpc_metadata compression_md;
-
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
@@ -1661,7 +1720,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* process compression level */
- memset(&compression_md, 0, sizeof(compression_md));
+ memset(&call->compression_md, 0, sizeof(call->compression_md));
size_t additional_metadata_count = 0;
grpc_compression_level effective_compression_level =
GRPC_COMPRESS_LEVEL_NONE;
@@ -1699,9 +1758,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
const grpc_stream_compression_algorithm calgo =
stream_compression_algorithm_for_level_locked(
call, effective_stream_compression_level);
- compression_md.key =
+ call->compression_md.key =
GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
- compression_md.value =
+ call->compression_md.value =
grpc_stream_compression_algorithm_slice(calgo);
} else {
const grpc_compression_algorithm calgo =
@@ -1709,8 +1768,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call, effective_compression_level);
/* the following will be picked up by the compress filter and used
* as the call's compression algorithm. */
- compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
- compression_md.value = grpc_compression_algorithm_slice(calgo);
+ call->compression_md.key =
+ GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ call->compression_md.value =
+ grpc_compression_algorithm_slice(calgo);
additional_metadata_count++;
}
}
@@ -1725,7 +1786,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
- &compression_md, (int)additional_metadata_count)) {
+ &call->compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1737,6 +1798,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
+ if (call->is_client) {
+ stream_op_payload->send_initial_metadata.peer_string =
+ &call->peer_string;
+ }
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1869,6 +1934,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
+ if (!call->is_client) {
+ stream_op_payload->recv_initial_metadata.peer_string =
+ &call->peer_string;
+ }
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
@@ -1885,8 +1954,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
- GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready,
- bctl, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
+ receiving_stream_ready_in_call_combiner, bctl,
+ grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
num_completion_callbacks_needed++;
@@ -1956,7 +2026,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_op(exec_ctx, call, stream_op);
+ execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 185bfccb77..d537637cbb 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -19,6 +19,10 @@
#ifndef GRPC_CORE_LIB_SURFACE_CALL_H
#define GRPC_CORE_LIB_SURFACE_CALL_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/channel/context.h"
#include "src/core/lib/surface/api_trace.h"
@@ -26,10 +30,6 @@
#include <grpc/grpc.h>
#include <grpc/impl/codegen/compression_types.h>
-#ifdef __cplusplus
-extern "C" {
-#endif
-
typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
grpc_call *call, int success,
void *user_data);
@@ -89,7 +89,7 @@ grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx,
/* Given the top call_element, get the call object. */
grpc_call *grpc_call_from_top_element(grpc_call_element *surface_element);
-void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
grpc_call *call, const grpc_op *ops, size_t nops,
void *tag);
diff --git a/src/core/lib/surface/call_log_batch.c b/src/core/lib/surface/call_log_batch.c
index 4443aba58a..4a1c265817 100644
--- a/src/core/lib/surface/call_log_batch.c
+++ b/src/core/lib/surface/call_log_batch.c
@@ -103,7 +103,7 @@ char *grpc_op_string(const grpc_op *op) {
return out;
}
-void grpc_call_log_batch(char *file, int line, gpr_log_severity severity,
+void grpc_call_log_batch(const char *file, int line, gpr_log_severity severity,
grpc_call *call, const grpc_op *ops, size_t nops,
void *tag) {
char *tmp;
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 898476daee..280315036f 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -31,6 +31,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
@@ -129,6 +130,7 @@ void grpc_init(void) {
grpc_register_tracer(&grpc_trace_channel_stack_builder);
grpc_register_tracer(&grpc_http1_trace);
grpc_register_tracer(&grpc_cq_pluck_trace); // default on
+ grpc_register_tracer(&grpc_call_combiner_trace);
grpc_register_tracer(&grpc_combiner_trace);
grpc_register_tracer(&grpc_server_channel_trace);
grpc_register_tracer(&grpc_bdp_estimator_trace);
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index a0791080a9..6286f9159d 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -40,6 +40,7 @@ namespace grpc_core {
namespace {
struct CallData {
+ grpc_call_combiner *call_combiner;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
grpc_core::atomic<bool> filled_metadata;
@@ -52,14 +53,14 @@ struct ChannelData {
static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *mdb) {
- CallData *calld = static_cast<CallData *>(elem->call_data);
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
bool expected = false;
if (!calld->filled_metadata.compare_exchange_strong(
expected, true, grpc_core::memory_order_relaxed,
grpc_core::memory_order_relaxed)) {
return;
}
- ChannelData *chand = static_cast<ChannelData *>(elem->channel_data);
+ ChannelData *chand = reinterpret_cast<ChannelData *>(elem->channel_data);
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(chand->error_code, tmp);
calld->status.md = grpc_mdelem_from_slices(
@@ -79,6 +80,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void lame_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
if (op->recv_initial_metadata) {
fill_metadata(exec_ctx, elem,
op->payload->recv_initial_metadata.recv_initial_metadata);
@@ -87,12 +89,8 @@ static void lame_start_transport_stream_op_batch(
op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
-}
-
-static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- return NULL;
+ exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
+ calld->call_combiner);
}
static void lame_get_channel_info(grpc_exec_ctx *exec_ctx,
@@ -122,6 +120,8 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
+ calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -156,7 +156,6 @@ extern "C" const grpc_channel_filter grpc_lame_filter = {
sizeof(grpc_core::ChannelData),
grpc_core::init_channel_elem,
grpc_core::destroy_channel_elem,
- grpc_core::lame_get_peer,
grpc_core::lame_get_channel_info,
"lame-client",
};
@@ -176,7 +175,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
"error_message=%s)",
3, (target, (int)error_code, error_message));
GPR_ASSERT(elem->filter == &grpc_lame_filter);
- auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data);
+ auto chand = reinterpret_cast<grpc_core::ChannelData *>(elem->channel_data);
chand->error_code = error_code;
chand->error_message = error_message;
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 66dcc299aa..8582d826ca 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -789,7 +789,6 @@ static void server_mutate_op(grpc_call_element *elem,
static void server_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -962,7 +961,6 @@ const grpc_channel_filter grpc_server_top_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server",
};
diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c
index 8f24b8527c..a077052561 100644
--- a/src/core/lib/transport/metadata_batch.c
+++ b/src/core/lib/transport/metadata_batch.c
@@ -105,6 +105,7 @@ static grpc_error *maybe_link_callout(grpc_metadata_batch *batch,
return GRPC_ERROR_NONE;
}
if (batch->idx.array[idx] == NULL) {
+ if (grpc_static_callout_is_default[idx]) ++batch->list.default_count;
batch->idx.array[idx] = storage;
return GRPC_ERROR_NONE;
}
@@ -120,6 +121,7 @@ static void maybe_unlink_callout(grpc_metadata_batch *batch,
if (idx == GRPC_BATCH_CALLOUTS_COUNT) {
return;
}
+ if (grpc_static_callout_is_default[idx]) --batch->list.default_count;
GPR_ASSERT(batch->idx.array[idx] != NULL);
batch->idx.array[idx] = NULL;
}
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index 1b11a3e252..57d298c75c 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -41,6 +41,7 @@ typedef struct grpc_linked_mdelem {
typedef struct grpc_mdelem_list {
size_t count;
+ size_t default_count; // Number of default keys.
grpc_linked_mdelem *head;
grpc_linked_mdelem *tail;
} grpc_mdelem_list;
diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c
index 28f05d5c44..b20d94aeac 100644
--- a/src/core/lib/transport/static_metadata.c
+++ b/src/core/lib/transport/static_metadata.c
@@ -823,6 +823,31 @@ grpc_mdelem_data grpc_static_mdelem_table[GRPC_STATIC_MDELEM_COUNT] = {
{.refcount = &grpc_static_metadata_refcounts[97],
.data.refcounted = {g_bytes + 1040, 13}}},
};
+bool grpc_static_callout_is_default[GRPC_BATCH_CALLOUTS_COUNT] = {
+ true, // :path
+ true, // :method
+ true, // :status
+ true, // :authority
+ true, // :scheme
+ true, // te
+ true, // grpc-message
+ true, // grpc-status
+ true, // grpc-payload-bin
+ true, // grpc-encoding
+ true, // grpc-accept-encoding
+ true, // grpc-server-stats-bin
+ true, // grpc-tags-bin
+ true, // grpc-trace-bin
+ true, // content-type
+ true, // content-encoding
+ true, // accept-encoding
+ true, // grpc-internal-encoding-request
+ true, // grpc-internal-stream-encoding-request
+ true, // user-agent
+ true, // host
+ true, // lb-token
+};
+
const uint8_t grpc_static_accept_encoding_metadata[8] = {0, 76, 77, 78,
79, 80, 81, 82};
diff --git a/src/core/lib/transport/static_metadata.h b/src/core/lib/transport/static_metadata.h
index 93ab90dff8..f03a9d23b1 100644
--- a/src/core/lib/transport/static_metadata.h
+++ b/src/core/lib/transport/static_metadata.h
@@ -571,6 +571,8 @@ typedef union {
GRPC_BATCH_CALLOUTS_COUNT) \
: GRPC_BATCH_CALLOUTS_COUNT)
+extern bool grpc_static_callout_is_default[GRPC_BATCH_CALLOUTS_COUNT];
+
extern const uint8_t grpc_static_accept_encoding_metadata[8];
#define GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(algs) \
(GRPC_MAKE_MDELEM( \
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 6c61f4b8d9..650b0559aa 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -197,11 +197,6 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
then_schedule_closure);
}
-char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_transport *transport) {
- return transport->vtable->get_peer(exec_ctx, transport);
-}
-
grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *transport) {
return transport->vtable->get_endpoint(exec_ctx, transport);
@@ -214,24 +209,24 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
// is a function that must always unref cancel_error
// though it lives in lib, it handles transport stream ops sure
// it's grpc_transport_stream_op_batch_finish_with_failure
-
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch,
- grpc_error *error) {
+ grpc_error *error, grpc_call_combiner *call_combiner) {
if (batch->send_message) {
grpc_byte_stream_destroy(exec_ctx,
batch->payload->send_message.send_message);
}
if (batch->recv_message) {
- GRPC_CLOSURE_SCHED(exec_ctx,
- batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error));
+ GRPC_CALL_COMBINER_START(exec_ctx, call_combiner,
+ batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error),
+ "failing recv_message_ready");
}
if (batch->recv_initial_metadata) {
- GRPC_CLOSURE_SCHED(
- exec_ctx,
+ GRPC_CALL_COMBINER_START(
+ exec_ctx, call_combiner,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
- GRPC_ERROR_REF(error));
+ GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
if (batch->cancel_stream) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 099138ea14..fbf5dcb8b5 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -22,6 +22,7 @@
#include <stddef.h>
#include "src/core/lib/channel/context.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
@@ -152,6 +153,9 @@ struct grpc_transport_stream_op_batch_payload {
/** Iff send_initial_metadata != NULL, flags associated with
send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
uint32_t send_initial_metadata_flags;
+ // If non-NULL, will be set by the transport to the peer string
+ // (a char*, which the caller takes ownership of).
+ gpr_atm *peer_string;
} send_initial_metadata;
struct {
@@ -176,6 +180,9 @@ struct grpc_transport_stream_op_batch_payload {
// immediately available. This may be a signal that we received a
// Trailers-Only response.
bool *trailing_metadata_available;
+ // If non-NULL, will be set by the transport to the peer string
+ // (a char*, which the caller takes ownership of).
+ gpr_atm *peer_string;
} recv_initial_metadata;
struct {
@@ -293,7 +300,7 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
- grpc_error *error);
+ grpc_error *error, grpc_call_combiner *call_combiner);
char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op);
char *grpc_transport_op_string(grpc_transport_op *op);
@@ -332,10 +339,6 @@ void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
-/* Get the transports peer */
-char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_transport *transport);
-
/* Get the endpoint used by \a transport */
grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *transport);
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index fc772c6dd1..bbae69c223 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -59,9 +59,6 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
- /* implementation of grpc_transport_get_peer */
- char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
-
/* implementation of grpc_transport_get_endpoint */
grpc_endpoint *(*get_endpoint)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
} grpc_transport_vtable;
diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c
index 7b18229ba6..409a6c4103 100644
--- a/src/core/lib/transport/transport_op_string.c
+++ b/src/core/lib/transport/transport_op_string.c
@@ -112,6 +112,13 @@ char *grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
+ if (op->collect_stats) {
+ gpr_strvec_add(&b, gpr_strdup(" "));
+ gpr_asprintf(&tmp, "COLLECT_STATS:%p",
+ op->payload->collect_stats.collect_stats);
+ gpr_strvec_add(&b, tmp);
+ }
+
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);