diff options
author | Mark D. Roth <roth@google.com> | 2016-08-31 08:33:37 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-08-31 08:33:37 -0700 |
commit | 1bbe6cb143e98805a5f9dc2745a45bff85bd768c (patch) | |
tree | a158b0ff0b5179b5409d75fab1874b875bcd2bf6 /src | |
parent | d59a5fc9ee39b32bcb076e80ff81d9e3170bc9cb (diff) |
Add locking. Add cancellation check. Use grpc_call_element_send_cancel().
Diffstat (limited to 'src')
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 105 | ||||
-rw-r--r-- | src/core/lib/transport/transport.c | 4 |
2 files changed, 72 insertions, 37 deletions
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 852fbaf003..9dff1c4d63 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -35,6 +35,7 @@ #include <string.h> #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include <grpc/support/time.h> #include "src/core/lib/iomgr/timer.h" @@ -47,6 +48,8 @@ typedef struct channel_data { typedef struct base_call_data { // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; + // Guards access to timer_pending and timer. + gpr_mu timer_mu; // True if the timer callback is currently pending. bool timer_pending; // The deadline timer. @@ -91,6 +94,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx, // Note: size of call data is different between client and server. memset(calld, 0, elem->filter->sizeof_call_data); calld->call_stack = args->call_stack; + gpr_mu_init(&calld->timer_mu); return GRPC_ERROR_NONE; } @@ -98,6 +102,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx* exec_ctx, static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, void* and_free_memory) { + base_call_data* calld = elem->call_data; + gpr_mu_destroy(&calld->timer_mu); } // Timer callback. @@ -105,13 +111,13 @@ static void timer_callback(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element* elem = arg; base_call_data* calld = elem->call_data; + gpr_mu_lock(&calld->timer_mu); calld->timer_pending = false; + gpr_mu_unlock(&calld->timer_mu); if (error != GRPC_ERROR_CANCELLED) { - gpr_slice message = gpr_slice_from_static_string("Deadline Exceeded"); - grpc_call_element_send_cancel_with_message( - exec_ctx, elem, GRPC_STATUS_DEADLINE_EXCEEDED, &message); + grpc_call_element_send_cancel(exec_ctx, elem); } - GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline"); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->call_stack, "deadline_timer"); } // Starts the deadline timer. @@ -122,20 +128,30 @@ static void start_timer_if_needed(grpc_exec_ctx *exec_ctx, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // Take a reference to the call stack, to be owned by the timer. - GRPC_CALL_STACK_REF(calld->call_stack, "deadline"); + GRPC_CALL_STACK_REF(calld->call_stack, "deadline_timer"); + gpr_mu_lock(&calld->timer_mu); + calld->timer_pending = true; grpc_timer_init(exec_ctx, &calld->timer, deadline, timer_callback, elem, gpr_now(GPR_CLOCK_MONOTONIC)); - calld->timer_pending = true; + gpr_mu_unlock(&calld->timer_mu); } } -// Callback run when the call is complete. -static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - base_call_data* calld = arg; +// Cancels the deadline timer. +static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, + base_call_data* calld) { + gpr_mu_lock(&calld->timer_mu); if (calld->timer_pending) { grpc_timer_cancel(exec_ctx, &calld->timer); calld->timer_pending = false; } + gpr_mu_unlock(&calld->timer_mu); +} + +// Callback run when the call is complete. +static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + base_call_data* calld = arg; + cancel_timer_if_needed(exec_ctx, calld); // Invoke the next callback. calld->next_on_complete->cb(exec_ctx, calld->next_on_complete->cb_arg, error); } @@ -145,18 +161,24 @@ static void client_start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { base_call_data* calld = elem->call_data; - // If we're sending initial metadata, get the deadline from the metadata - // and start the timer if needed. - if (op->send_initial_metadata != NULL) { - start_timer_if_needed(exec_ctx, elem, - op->send_initial_metadata->deadline); - } - // Make sure we know when the call is complete, so that we can cancel - // the timer. - if (op->recv_trailing_metadata != NULL) { - calld->next_on_complete = op->on_complete; - grpc_closure_init(&calld->on_complete, on_complete, calld); - op->on_complete = &calld->on_complete; + // If the call is cancelled or closed, cancel the timer. + if (op->cancel_error != GRPC_ERROR_NONE || + op->close_error != GRPC_ERROR_NONE) { + cancel_timer_if_needed(exec_ctx, calld); + } else { + // If we're sending initial metadata, get the deadline from the metadata + // and start the timer if needed. + if (op->send_initial_metadata != NULL) { + start_timer_if_needed(exec_ctx, elem, + op->send_initial_metadata->deadline); + } + // Make sure we know when the call is complete, so that we can cancel + // the timer. + if (op->recv_trailing_metadata != NULL) { + calld->next_on_complete = op->on_complete; + grpc_closure_init(&calld->on_complete, on_complete, calld); + op->on_complete = &calld->on_complete; + } } // Chain to next filter. grpc_call_next_op(exec_ctx, elem, op); @@ -180,22 +202,31 @@ static void server_start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { server_call_data* calld = elem->call_data; - // If we're receiving initial metadata, we need to get the deadline - // from the recv_initial_metadata_ready callback. So we inject our - // own callback into that hook. - if (op->recv_initial_metadata_ready != NULL) { - calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready; - calld->recv_initial_metadata = op->recv_initial_metadata; - grpc_closure_init(&calld->recv_initial_metadata_ready, - recv_initial_metadata_ready, elem); - op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; - } - // Make sure we know when the call is complete, so that we can cancel - // the timer. - if (op->send_trailing_metadata != NULL) { - calld->base.next_on_complete = op->on_complete; - grpc_closure_init(&calld->base.on_complete, on_complete, calld); - op->on_complete = &calld->base.on_complete; + // If the call is cancelled or closed, cancel the timer. + if (op->cancel_error != GRPC_ERROR_NONE || + op->close_error != GRPC_ERROR_NONE) { + cancel_timer_if_needed(exec_ctx, &calld->base); + } else { + // If we're receiving initial metadata, we need to get the deadline + // from the recv_initial_metadata_ready callback. So we inject our + // own callback into that hook. + if (op->recv_initial_metadata_ready != NULL) { + calld->next_recv_initial_metadata_ready = op->recv_initial_metadata_ready; + calld->recv_initial_metadata = op->recv_initial_metadata; + grpc_closure_init(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem); + op->recv_initial_metadata_ready = &calld->recv_initial_metadata_ready; + } + // Make sure we know when the call is complete, so that we can cancel + // the timer. + // Note that we trigger this on recv_trailing_metadata, even though + // the client never sends trailing metadata, because this is the + // hook that tells us when the call is complete on the server side. + if (op->recv_trailing_metadata != NULL) { + calld->base.next_on_complete = op->on_complete; + grpc_closure_init(&calld->base.on_complete, on_complete, calld); + op->on_complete = &calld->base.on_complete; + } } // Chain to next filter. grpc_call_next_op(exec_ctx, elem, op); diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 36672bdbc5..d4e197fa5c 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -220,6 +220,10 @@ void grpc_transport_stream_op_add_cancellation_with_message( error = GRPC_ERROR_CREATE("Call cancelled"); } error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, status); + // TODO(ctiller): We are intentionally setting close_error instead of + // cancel_error here. This is an ugly hack and should be replaced + // by a more general-purpose mechanism that allows us to control + // cancel/close behavior. add_error(op, &op->close_error, error); } |