aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel/deadline_filter.c
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2016-09-14 15:18:40 -0700
committerGravatar Mark D. Roth <roth@google.com>2016-09-14 15:18:40 -0700
commitf28763c68c9b3caf539f4d38ff123ae5de69e6d8 (patch)
treec5e7ccae33214c71f50fc006a0662bebc9e9787b /src/core/lib/channel/deadline_filter.c
parenta99e02cc0d165a226bf57eb49a290f3a626168d7 (diff)
Pass deadline into filters via grpc_call_element_args, so that we can
start the timer before the first op is sent down.
Diffstat (limited to 'src/core/lib/channel/deadline_filter.c')
-rw-r--r--src/core/lib/channel/deadline_filter.c58
1 files changed, 43 insertions, 15 deletions
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index 010fedd7b7..079b98a2f8 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -34,10 +34,12 @@
#include <stdbool.h>
#include <string.h>
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/timer.h"
//
@@ -106,15 +108,49 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
op->on_complete = &deadline_state->on_complete;
}
-void grpc_deadline_state_init(grpc_deadline_state* deadline_state,
- grpc_call_stack* call_stack) {
+// Callback and associated state for starting the timer after call stack
+// initialization has been completed.
+struct start_timer_after_init_state {
+ grpc_call_element* elem;
+ gpr_timespec deadline;
+ grpc_closure closure;
+};
+static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+ grpc_error* error) {
+ struct start_timer_after_init_state* state = arg;
+ start_timer_if_needed(exec_ctx, state->elem, state->deadline);
+ gpr_free(state);
+}
+
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+ grpc_call_element_args* args) {
+ grpc_deadline_state* deadline_state = elem->call_data;
memset(deadline_state, 0, sizeof(*deadline_state));
- deadline_state->call_stack = call_stack;
+ deadline_state->call_stack = args->call_stack;
gpr_mu_init(&deadline_state->timer_mu);
+ // Deadline will always be infinite on servers, so the timer will only be
+ // set on clients with a finite deadline.
+ const gpr_timespec deadline =
+ gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+ // When the deadline passes, we indicate the failure by sending down
+ // an op with cancel_error set. However, we can't send down any ops
+ // until after the call stack is fully initialized. If we start the
+ // timer here, we have no guarantee that the timer won't pop before
+ // call stack initialization is finished. To avoid that problem, we
+ // create a closure to start the timer, and we schedule that closure
+ // to be run after call stack initialization is done.
+ struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
+ state->elem = elem;
+ state->deadline = deadline;
+ grpc_closure_init(&state->closure, start_timer_after_init, state);
+ grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL);
+ }
}
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
- grpc_deadline_state* deadline_state) {
+ grpc_call_element* elem) {
+ grpc_deadline_state* deadline_state = elem->call_data;
cancel_timer_if_needed(exec_ctx, deadline_state);
gpr_mu_destroy(&deadline_state->timer_mu);
}
@@ -127,12 +163,6 @@ void grpc_deadline_state_client_start_transport_stream_op(
op->close_error != GRPC_ERROR_NONE) {
cancel_timer_if_needed(exec_ctx, deadline_state);
} else {
- // If we're sending initial metadata, get the deadline from the metadata
- // and start the timer if needed.
- if (op->send_initial_metadata != NULL) {
- start_timer_if_needed(exec_ctx, elem,
- op->send_initial_metadata->deadline);
- }
// Make sure we know when the call is complete, so that we can cancel
// the timer.
if (op->recv_trailing_metadata != NULL) {
@@ -177,10 +207,9 @@ typedef struct server_call_data {
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
grpc_call_element_args* args) {
- base_call_data* calld = elem->call_data;
// Note: size of call data is different between client and server.
- memset(calld, 0, elem->filter->sizeof_call_data);
- grpc_deadline_state_init(&calld->deadline_state, args->call_stack);
+ memset(elem->call_data, 0, elem->filter->sizeof_call_data);
+ grpc_deadline_state_init(exec_ctx, elem, args);
return GRPC_ERROR_NONE;
}
@@ -188,8 +217,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
const grpc_call_final_info* final_info,
void* and_free_memory) {
- base_call_data* calld = elem->call_data;
- grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state);
+ grpc_deadline_state_destroy(exec_ctx, elem);
}
// Method for starting a call op for client filter.