aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/http_server_filter.c2
-rw-r--r--src/core/lib/channel/max_age_filter.c386
-rw-r--r--src/core/lib/channel/max_age_filter.h39
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c135
-rw-r--r--src/core/lib/surface/init.c4
5 files changed, 488 insertions, 78 deletions
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index 6b1a02efa1..df0f95010b 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -215,7 +215,7 @@ static grpc_error *server_filter_incoming_metadata(grpc_exec_ctx *exec_ctx,
size_t path_length = GRPC_SLICE_LENGTH(path_slice);
/* offset of the character '?' */
size_t offset = 0;
- for (offset = 0; *path_ptr != k_query_separator && offset < path_length;
+ for (offset = 0; offset < path_length && *path_ptr != k_query_separator;
path_ptr++, offset++)
;
if (offset < path_length) {
diff --git a/src/core/lib/channel/max_age_filter.c b/src/core/lib/channel/max_age_filter.c
new file mode 100644
index 0000000000..c25481486c
--- /dev/null
+++ b/src/core/lib/channel/max_age_filter.c
@@ -0,0 +1,386 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/lib/channel/message_size_filter.h"
+
+#include <limits.h>
+#include <string.h>
+
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/transport/http2_errors.h"
+#include "src/core/lib/transport/service_config.h"
+
+#define DEFAULT_MAX_CONNECTION_AGE_MS INT_MAX
+#define DEFAULT_MAX_CONNECTION_AGE_GRACE_MS INT_MAX
+#define DEFAULT_MAX_CONNECTION_IDLE_MS INT_MAX
+
+typedef struct channel_data {
+ /* We take a reference to the channel stack for the timer callback */
+ grpc_channel_stack* channel_stack;
+ /* Guards access to max_age_timer, max_age_timer_pending, max_age_grace_timer
+ and max_age_grace_timer_pending */
+ gpr_mu max_age_timer_mu;
+ /* True if the max_age timer callback is currently pending */
+ bool max_age_timer_pending;
+ /* True if the max_age_grace timer callback is currently pending */
+ bool max_age_grace_timer_pending;
+ /* The timer for checking if the channel has reached its max age */
+ grpc_timer max_age_timer;
+ /* The timer for checking if the max-aged channel has uesed up the grace
+ period */
+ grpc_timer max_age_grace_timer;
+ /* The timer for checking if the channel's idle duration reaches
+ max_connection_idle */
+ grpc_timer max_idle_timer;
+ /* Allowed max time a channel may have no outstanding rpcs */
+ gpr_timespec max_connection_idle;
+ /* Allowed max time a channel may exist */
+ gpr_timespec max_connection_age;
+ /* Allowed grace period after the channel reaches its max age */
+ gpr_timespec max_connection_age_grace;
+ /* Closure to run when the channel's idle duration reaches max_connection_idle
+ and should be closed gracefully */
+ grpc_closure close_max_idle_channel;
+ /* Closure to run when the channel reaches its max age and should be closed
+ gracefully */
+ grpc_closure close_max_age_channel;
+ /* Closure to run the channel uses up its max age grace time and should be
+ closed forcibly */
+ grpc_closure force_close_max_age_channel;
+ /* Closure to run when the init fo channel stack is done and the max_idle
+ timer should be started */
+ grpc_closure start_max_idle_timer_after_init;
+ /* Closure to run when the init fo channel stack is done and the max_age timer
+ should be started */
+ grpc_closure start_max_age_timer_after_init;
+ /* Closure to run when the goaway op is finished and the max_age_timer */
+ grpc_closure start_max_age_grace_timer_after_goaway_op;
+ /* Closure to run when the channel connectivity state changes */
+ grpc_closure channel_connectivity_changed;
+ /* Records the current connectivity state */
+ grpc_connectivity_state connectivity_state;
+ /* Number of active calls */
+ gpr_atm call_count;
+} channel_data;
+
+/* Increase the nubmer of active calls. Before the increasement, if there are no
+ calls, the max_idle_timer should be cancelled. */
+static void increase_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) {
+ if (gpr_atm_full_fetch_add(&chand->call_count, 1) == 0) {
+ grpc_timer_cancel(exec_ctx, &chand->max_idle_timer);
+ }
+}
+
+/* Decrease the nubmer of active calls. After the decrement, if there are no
+ calls, the max_idle_timer should be started. */
+static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) {
+ if (gpr_atm_full_fetch_add(&chand->call_count, -1) == 1) {
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_idle_timer");
+ grpc_timer_init(
+ exec_ctx, &chand->max_idle_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_idle),
+ &chand->close_max_idle_channel, gpr_now(GPR_CLOCK_MONOTONIC));
+ }
+}
+
+static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ /* Decrease call_count. If there are no active calls at this time,
+ max_idle_timer will start here. If the number of active calls is not 0,
+ max_idle_timer will start after all the active calls end. */
+ decrease_call_count(exec_ctx, chand);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age start_max_idle_timer_after_init");
+}
+
+static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ chand->max_age_timer_pending = true;
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer");
+ grpc_timer_init(
+ exec_ctx, &chand->max_age_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), chand->max_connection_age),
+ &chand->close_max_age_channel, gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op->connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(exec_ctx,
+ grpc_channel_stack_element(chand->channel_stack, 0), op);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age start_max_age_timer_after_init");
+}
+
+static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx,
+ void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ chand->max_age_grace_timer_pending = true;
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer");
+ grpc_timer_init(exec_ctx, &chand->max_age_grace_timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ chand->max_connection_age_grace),
+ &chand->force_close_max_age_channel,
+ gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age start_max_age_grace_timer_after_goaway_op");
+}
+
+static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_atm_no_barrier_fetch_add(&chand->call_count, 1);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ op->goaway_error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_idle"),
+ GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR);
+ grpc_channel_element* elem =
+ grpc_channel_stack_element(chand->channel_stack, 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+ } else if (error != GRPC_ERROR_CANCELLED) {
+ GRPC_LOG_IF_ERROR("close_max_idle_channel", error);
+ }
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age max_idle_timer");
+}
+
+static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ chand->max_age_timer_pending = false;
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ if (error == GRPC_ERROR_NONE) {
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack,
+ "max_age start_max_age_grace_timer_after_goaway_op");
+ grpc_transport_op* op = grpc_make_transport_op(
+ &chand->start_max_age_grace_timer_after_goaway_op);
+ op->goaway_error =
+ grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("max_age"),
+ GRPC_ERROR_INT_HTTP2_ERROR, GRPC_HTTP2_NO_ERROR);
+ grpc_channel_element* elem =
+ grpc_channel_stack_element(chand->channel_stack, 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+ } else if (error != GRPC_ERROR_CANCELLED) {
+ GRPC_LOG_IF_ERROR("close_max_age_channel", error);
+ }
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age max_age_timer");
+}
+
+static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ chand->max_age_grace_timer_pending = false;
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ if (error == GRPC_ERROR_NONE) {
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ op->disconnect_with_error =
+ GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel reaches max age");
+ grpc_channel_element* elem =
+ grpc_channel_stack_element(chand->channel_stack, 0);
+ elem->filter->start_transport_op(exec_ctx, elem, op);
+ } else if (error != GRPC_ERROR_CANCELLED) {
+ GRPC_LOG_IF_ERROR("force_close_max_age_channel", error);
+ }
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->channel_stack,
+ "max_age max_age_grace_timer");
+}
+
+static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ channel_data* chand = arg;
+ if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
+ grpc_transport_op* op = grpc_make_transport_op(NULL);
+ op->on_connectivity_state_change = &chand->channel_connectivity_changed,
+ op->connectivity_state = &chand->connectivity_state;
+ grpc_channel_next_op(
+ exec_ctx, grpc_channel_stack_element(chand->channel_stack, 0), op);
+ } else {
+ gpr_mu_lock(&chand->max_age_timer_mu);
+ if (chand->max_age_timer_pending) {
+ grpc_timer_cancel(exec_ctx, &chand->max_age_timer);
+ chand->max_age_timer_pending = false;
+ }
+ if (chand->max_age_grace_timer_pending) {
+ grpc_timer_cancel(exec_ctx, &chand->max_age_grace_timer);
+ chand->max_age_grace_timer_pending = false;
+ }
+ gpr_mu_unlock(&chand->max_age_timer_mu);
+ /* If there are no active calls, this increasement will cancel
+ max_idle_timer, and prevent max_idle_timer from being started in the
+ future. */
+ increase_call_count(exec_ctx, chand);
+ }
+}
+
+/* Constructor for call_data. */
+static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
+ grpc_call_element* elem,
+ const grpc_call_element_args* args) {
+ channel_data* chand = elem->channel_data;
+ increase_call_count(exec_ctx, chand);
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for call_data. */
+static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ const grpc_call_final_info* final_info,
+ grpc_closure* ignored) {
+ channel_data* chand = elem->channel_data;
+ decrease_call_count(exec_ctx, chand);
+}
+
+/* Constructor for channel_data. */
+static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem,
+ grpc_channel_element_args* args) {
+ channel_data* chand = elem->channel_data;
+ gpr_mu_init(&chand->max_age_timer_mu);
+ chand->max_age_timer_pending = false;
+ chand->max_age_grace_timer_pending = false;
+ chand->channel_stack = args->channel_stack;
+ chand->max_connection_age =
+ DEFAULT_MAX_CONNECTION_AGE_MS == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_MS, GPR_TIMESPAN);
+ chand->max_connection_age_grace =
+ DEFAULT_MAX_CONNECTION_AGE_GRACE_MS == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_AGE_GRACE_MS,
+ GPR_TIMESPAN);
+ chand->max_connection_idle =
+ DEFAULT_MAX_CONNECTION_IDLE_MS == INT_MAX
+ ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(DEFAULT_MAX_CONNECTION_IDLE_MS, GPR_TIMESPAN);
+ for (size_t i = 0; i < args->channel_args->num_args; ++i) {
+ if (0 == strcmp(args->channel_args->args[i].key,
+ GRPC_ARG_MAX_CONNECTION_AGE_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_MS, 1, INT_MAX});
+ chand->max_connection_age =
+ value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
+ } else if (0 == strcmp(args->channel_args->args[i].key,
+ GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_CONNECTION_AGE_GRACE_MS, 0,
+ INT_MAX});
+ chand->max_connection_age_grace =
+ value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
+ } else if (0 == strcmp(args->channel_args->args[i].key,
+ GRPC_ARG_MAX_CONNECTION_IDLE_MS)) {
+ const int value = grpc_channel_arg_get_integer(
+ &args->channel_args->args[i],
+ (grpc_integer_options){DEFAULT_MAX_CONNECTION_IDLE_MS, 1, INT_MAX});
+ chand->max_connection_idle =
+ value == INT_MAX ? gpr_inf_future(GPR_TIMESPAN)
+ : gpr_time_from_millis(value, GPR_TIMESPAN);
+ }
+ }
+ grpc_closure_init(&chand->close_max_idle_channel, close_max_idle_channel,
+ chand, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->close_max_age_channel, close_max_age_channel, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->force_close_max_age_channel,
+ force_close_max_age_channel, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->start_max_idle_timer_after_init,
+ start_max_idle_timer_after_init, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->start_max_age_timer_after_init,
+ start_max_age_timer_after_init, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->start_max_age_grace_timer_after_goaway_op,
+ start_max_age_grace_timer_after_goaway_op, chand,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand,
+ grpc_schedule_on_exec_ctx);
+
+ if (gpr_time_cmp(chand->max_connection_age, gpr_inf_future(GPR_TIMESPAN)) !=
+ 0) {
+ /* When the channel reaches its max age, we send down an op with
+ goaway_error set. However, we can't send down any ops until after the
+ channel stack is fully initialized. If we start the timer here, we have
+ no guarantee that the timer won't pop before channel stack initialization
+ is finished. To avoid that problem, we create a closure to start the
+ timer, and we schedule that closure to be run after call stack
+ initialization is done. */
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack,
+ "max_age start_max_age_timer_after_init");
+ grpc_closure_sched(exec_ctx, &chand->start_max_age_timer_after_init,
+ GRPC_ERROR_NONE);
+ }
+
+ /* Initialize the number of calls as 1, so that the max_idle_timer will not
+ start until start_max_idle_timer_after_init is invoked. */
+ gpr_atm_rel_store(&chand->call_count, 1);
+ if (gpr_time_cmp(chand->max_connection_idle, gpr_inf_future(GPR_TIMESPAN)) !=
+ 0) {
+ GRPC_CHANNEL_STACK_REF(chand->channel_stack,
+ "max_age start_max_idle_timer_after_init");
+ grpc_closure_sched(exec_ctx, &chand->start_max_idle_timer_after_init,
+ GRPC_ERROR_NONE);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+/* Destructor for channel_data. */
+static void destroy_channel_elem(grpc_exec_ctx* exec_ctx,
+ grpc_channel_element* elem) {}
+
+const grpc_channel_filter grpc_max_age_filter = {
+ grpc_call_next_op,
+ grpc_channel_next_op,
+ 0, /* sizeof_call_data */
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ grpc_channel_next_get_info,
+ "max_age"};
diff --git a/src/core/lib/channel/max_age_filter.h b/src/core/lib/channel/max_age_filter.h
new file mode 100644
index 0000000000..93e357a88e
--- /dev/null
+++ b/src/core/lib/channel/max_age_filter.h
@@ -0,0 +1,39 @@
+//
+// Copyright 2017, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+
+#ifndef GRPC_CORE_LIB_CHANNEL_MAX_AGE_FILTER_H
+#define GRPC_CORE_LIB_CHANNEL_MAX_AGE_FILTER_H
+
+#include "src/core/lib/channel/channel_stack.h"
+
+extern const grpc_channel_filter grpc_max_age_filter;
+
+#endif /* GRPC_CORE_LIB_CHANNEL_MAX_AGE_FILTER_H */
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 1924e76f13..f6372c0f3f 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -1107,19 +1107,20 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
grpc_closure *closure) {
while (true) {
- /* Fast-path: CLOSURE_NOT_READY -> <closure>.
- The 'release' cas here matches the 'acquire' load in set_ready and
- set_shutdown ensuring that the closure (scheduled by set_ready or
- set_shutdown) happens-after the I/O event on the fd */
- if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
- return; /* Fast-path successful. Return */
- }
-
- /* Slowpath. The 'acquire' load matches the 'release' cas in set_ready and
- set_shutdown */
- gpr_atm curr = gpr_atm_acq_load(state);
+ gpr_atm curr = gpr_atm_no_barrier_load(state);
switch (curr) {
case CLOSURE_NOT_READY: {
+ /* CLOSURE_NOT_READY -> <closure>.
+
+ We're guaranteed by API that there's an acquire barrier before here,
+ so there's no need to double-dip and this can be a release-only.
+
+ The release itself pairs with the acquire half of a set_ready full
+ barrier. */
+ if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, (gpr_atm)closure)) {
+ return; /* Successful. Return */
+ }
+
break; /* retry */
}
@@ -1134,7 +1135,7 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
is no other code that needs to 'happen-after' this) */
if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
- return; /* Slow-path successful. Return */
+ return; /* Successful. Return */
}
break; /* retry */
@@ -1165,30 +1166,19 @@ static void notify_on(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
grpc_error *shutdown_err) {
- /* Try the fast-path first (i.e expect the current value to be
- CLOSURE_NOT_READY */
- gpr_atm curr = CLOSURE_NOT_READY;
gpr_atm new_state = (gpr_atm)shutdown_err | FD_SHUTDOWN_BIT;
while (true) {
- /* The 'release' cas here matches the 'acquire' load in notify_on to ensure
- that the closure it schedules 'happens-after' the set_shutdown is called
- on the fd */
- if (gpr_atm_rel_cas(state, curr, new_state)) {
- return; /* Fast-path successful. Return */
- }
-
- /* Fallback to slowpath. This 'acquire' load matches the 'release' cas in
- notify_on and set_ready */
- curr = gpr_atm_acq_load(state);
+ gpr_atm curr = gpr_atm_no_barrier_load(state);
switch (curr) {
- case CLOSURE_READY: {
+ case CLOSURE_READY:
+ case CLOSURE_NOT_READY:
+ /* Need a full barrier here so that the initial load in notify_on
+ doesn't need a barrier */
+ if (gpr_atm_full_cas(state, curr, new_state)) {
+ return; /* early out */
+ }
break; /* retry */
- }
-
- case CLOSURE_NOT_READY: {
- break; /* retry */
- }
default: {
/* 'curr' is either a closure or the fd is already shutdown */
@@ -1199,10 +1189,11 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
}
/* Fd is not shutdown. Schedule the closure and move the state to
- shutdown state. The 'release' cas here matches the 'acquire' load in
- notify_on to ensure that the closure it schedules 'happens-after'
- the set_shutdown is called on the fd */
- if (gpr_atm_rel_cas(state, curr, new_state)) {
+ shutdown state.
+ Needs an acquire to pair with setting the closure (and get a
+ happens-after on that edge), and a release to pair with anything
+ loading the shutdown state. */
+ if (gpr_atm_full_cas(state, curr, new_state)) {
grpc_closure_sched(exec_ctx, (grpc_closure *)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
@@ -1220,52 +1211,42 @@ static void set_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state,
}
static void set_ready(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_atm *state) {
- /* Try an optimistic case first (i.e assume current state is
- CLOSURE_NOT_READY).
-
- This 'release' cas matches the 'acquire' load in notify_on ensuring that
- any closure (scheduled by notify_on) 'happens-after' the return from
- epoll_pwait */
- if (gpr_atm_rel_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
- return; /* early out */
- }
-
- /* The 'acquire' load here matches the 'release' cas in notify_on and
- set_shutdown */
- gpr_atm curr = gpr_atm_acq_load(state);
- switch (curr) {
- case CLOSURE_READY: {
- /* Already ready. We are done here */
- break;
- }
+ while (true) {
+ gpr_atm curr = gpr_atm_no_barrier_load(state);
- case CLOSURE_NOT_READY: {
- /* The state was not CLOSURE_NOT_READY when we checked initially at the
- beginning of this function but now it is CLOSURE_NOT_READY again.
- This is only possible if the state transitioned out of
- CLOSURE_NOT_READY to either CLOSURE_READY or <some closure> and then
- back to CLOSURE_NOT_READY again (i.e after we entered this function,
- the fd became "ready" and the necessary actions were already done).
- So there is no need to make the state CLOSURE_READY now */
- break;
- }
+ switch (curr) {
+ case CLOSURE_READY: {
+ /* Already ready. We are done here */
+ return;
+ }
- default: {
- /* 'curr' is either a closure or the fd is shutdown */
- if ((curr & FD_SHUTDOWN_BIT) > 0) {
- /* The fd is shutdown. Do nothing */
- } else if (gpr_atm_no_barrier_cas(state, curr, CLOSURE_NOT_READY)) {
- /* The cas above was no-barrier since the state is being transitioned to
- CLOSURE_NOT_READY; notify_on and set_shutdown do not schedule any
- closures when transitioning out of CLOSURE_NO_READY state (i.e there
- is no other code that needs to 'happen-after' this) */
+ case CLOSURE_NOT_READY: {
+ /* No barrier required as we're transitioning to a state that does not
+ involve a closure */
+ if (gpr_atm_no_barrier_cas(state, CLOSURE_NOT_READY, CLOSURE_READY)) {
+ return; /* early out */
+ }
+ break; /* retry */
+ }
- grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
+ default: {
+ /* 'curr' is either a closure or the fd is shutdown */
+ if ((curr & FD_SHUTDOWN_BIT) > 0) {
+ /* The fd is shutdown. Do nothing */
+ return;
+ }
+ /* Full cas: acquire pairs with this cas' release in the event of a
+ spurious set_ready; release pairs with this or the acquire in
+ notify_on (or set_shutdown) */
+ else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) {
+ grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
+ return;
+ }
+ /* else the state changed again (only possible by either a racing
+ set_ready or set_shutdown functions. In both these cases, the closure
+ would have been scheduled for execution. So we are done here */
+ return;
}
- /* else the state changed again (only possible by either a racing
- set_ready or set_shutdown functions. In both these cases, the closure
- would have been scheduled for execution. So we are done here */
- break;
}
}
}
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 91bd014a0e..b46ecac18d 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -47,6 +47,7 @@
#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/channel/http_server_filter.h"
+#include "src/core/lib/channel/max_age_filter.h"
#include "src/core/lib/channel/message_size_filter.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
@@ -114,6 +115,9 @@ static void register_builtin_channel_init() {
GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
(void *)&grpc_server_deadline_filter);
grpc_channel_init_register_stage(
+ GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, prepend_filter,
+ (void *)&grpc_max_age_filter);
+ grpc_channel_init_register_stage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
prepend_filter, (void *)&grpc_message_size_filter);
grpc_channel_init_register_stage(