aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/channel')
-rw-r--r--src/core/lib/channel/channel_stack.c15
-rw-r--r--src/core/lib/channel/channel_stack.h8
-rw-r--r--src/core/lib/channel/channel_stack_builder.c5
-rw-r--r--src/core/lib/channel/compress_filter.c2
-rw-r--r--src/core/lib/channel/connected_channel.c2
-rw-r--r--src/core/lib/channel/deadline_filter.c107
-rw-r--r--src/core/lib/channel/deadline_filter.h13
-rw-r--r--src/core/lib/channel/handshaker.c3
-rw-r--r--src/core/lib/channel/http_client_filter.c2
-rw-r--r--src/core/lib/channel/http_server_filter.c11
-rw-r--r--src/core/lib/channel/message_size_filter.c3
11 files changed, 86 insertions, 85 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index ec973d4e7f..3fb2a60ac7 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -173,7 +173,6 @@ grpc_error *grpc_call_stack_init(
grpc_slice path, gpr_timespec start_time, gpr_timespec deadline,
grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
- grpc_call_element_args args;
size_t count = channel_stack->count;
grpc_call_element *call_elems;
char *user_data;
@@ -188,13 +187,15 @@ grpc_error *grpc_call_stack_init(
/* init per-filter data */
grpc_error *first_error = GRPC_ERROR_NONE;
- args.start_time = start_time;
+ const grpc_call_element_args args = {
+ .start_time = start_time,
+ .call_stack = call_stack,
+ .server_transport_data = transport_server_data,
+ .context = context,
+ .path = path,
+ .deadline = deadline,
+ };
for (i = 0; i < count; i++) {
- args.call_stack = call_stack;
- args.server_transport_data = transport_server_data;
- args.context = context;
- args.path = path;
- args.deadline = deadline;
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 1cf07d43c2..6d3340bcbf 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -128,10 +128,11 @@ typedef struct {
server_transport_data is an opaque pointer. If it is NULL, this call is
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
- argument. */
+ argument.
+ Implementations may assume that elem->call_data is all zeros. */
grpc_error *(*init_call_elem)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args);
+ const grpc_call_element_args *args);
void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent);
@@ -152,7 +153,8 @@ typedef struct {
is what needs initializing.
is_first, is_last designate this elements position in the stack, and are
useful for asserting correct configuration by upper layer code.
- The filter does not need to do any chaining */
+ The filter does not need to do any chaining.
+ Implementations may assume that elem->call_data is all zeros. */
grpc_error *(*init_channel_elem)(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
grpc_channel_element_args *args);
diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c
index 5f9e3b4539..b515b7321a 100644
--- a/src/core/lib/channel/channel_stack_builder.c
+++ b/src/core/lib/channel/channel_stack_builder.c
@@ -65,8 +65,7 @@ struct grpc_channel_stack_builder_iterator {
};
grpc_channel_stack_builder *grpc_channel_stack_builder_create(void) {
- grpc_channel_stack_builder *b = gpr_malloc(sizeof(*b));
- memset(b, 0, sizeof(*b));
+ grpc_channel_stack_builder *b = gpr_zalloc(sizeof(*b));
b->begin.filter = NULL;
b->end.filter = NULL;
@@ -251,7 +250,7 @@ grpc_error *grpc_channel_stack_builder_finish(
size_t channel_stack_size = grpc_channel_stack_size(filters, num_filters);
// allocate memory, with prefix_bytes followed by channel_stack_size
- *result = gpr_malloc(prefix_bytes + channel_stack_size);
+ *result = gpr_zalloc(prefix_bytes + channel_stack_size);
// fetch a pointer to the channel stack
grpc_channel_stack *channel_stack =
(grpc_channel_stack *)((char *)(*result) + prefix_bytes);
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 22781c7839..aa41014a21 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -274,7 +274,7 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 068c61c92a..29796f7ca7 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -83,7 +83,7 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
int r = grpc_transport_init_stream(
diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c
index bc9a2effc2..5a12d62f1d 100644
--- a/src/core/lib/channel/deadline_filter.c
+++ b/src/core/lib/channel/deadline_filter.c
@@ -52,9 +52,6 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
grpc_error* error) {
grpc_call_element* elem = arg;
grpc_deadline_state* deadline_state = elem->call_data;
- gpr_mu_lock(&deadline_state->timer_mu);
- deadline_state->timer_pending = false;
- gpr_mu_unlock(&deadline_state->timer_mu);
if (error != GRPC_ERROR_CANCELLED) {
grpc_call_element_signal_error(
exec_ctx, elem,
@@ -66,53 +63,64 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg,
}
// Starts the deadline timer.
-static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
- grpc_call_element* elem,
- gpr_timespec deadline) {
- grpc_deadline_state* deadline_state = elem->call_data;
- deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
- // Note: We do not start the timer if there is already a timer
- // pending. This should be okay, because this is only called from two
- // functions exported by this module: grpc_deadline_state_start(), which
- // starts the initial timer, and grpc_deadline_state_reset(), which
- // cancels any pre-existing timer before starting a new one. In
- // particular, we want to ensure that if grpc_deadline_state_start()
- // winds up trying to start the timer after grpc_deadline_state_reset()
- // has already done so, we ignore the value from the former.
- if (!deadline_state->timer_pending &&
- gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
- // Take a reference to the call stack, to be owned by the timer.
- GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
- deadline_state->timer_pending = true;
- grpc_closure_init(&deadline_state->timer_callback, timer_callback, elem,
- grpc_schedule_on_exec_ctx);
- grpc_timer_init(exec_ctx, &deadline_state->timer, deadline,
- &deadline_state->timer_callback,
- gpr_now(GPR_CLOCK_MONOTONIC));
- }
-}
static void start_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
gpr_timespec deadline) {
+ deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
+ if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) {
+ return;
+ }
grpc_deadline_state* deadline_state = elem->call_data;
- gpr_mu_lock(&deadline_state->timer_mu);
- start_timer_if_needed_locked(exec_ctx, elem, deadline);
- gpr_mu_unlock(&deadline_state->timer_mu);
+ grpc_deadline_timer_state cur_state;
+ grpc_closure* closure = NULL;
+retry:
+ cur_state =
+ (grpc_deadline_timer_state)gpr_atm_acq_load(&deadline_state->timer_state);
+ switch (cur_state) {
+ case GRPC_DEADLINE_STATE_PENDING:
+ // Note: We do not start the timer if there is already a timer
+ return;
+ case GRPC_DEADLINE_STATE_FINISHED:
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_FINISHED,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ // If we've already created and destroyed a timer, we always create a
+ // new closure: we have no other guarantee that the inlined closure is
+ // not in use (it may hold a pending call to timer_callback)
+ closure = grpc_closure_create(timer_callback, elem,
+ grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
+ break;
+ case GRPC_DEADLINE_STATE_INITIAL:
+ if (gpr_atm_rel_cas(&deadline_state->timer_state,
+ GRPC_DEADLINE_STATE_INITIAL,
+ GRPC_DEADLINE_STATE_PENDING)) {
+ closure =
+ grpc_closure_init(&deadline_state->timer_callback, timer_callback,
+ elem, grpc_schedule_on_exec_ctx);
+ } else {
+ goto retry;
+ }
+ break;
+ }
+ GPR_ASSERT(closure);
+ GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer");
+ grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, closure,
+ gpr_now(GPR_CLOCK_MONOTONIC));
}
// Cancels the deadline timer.
-static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx,
- grpc_deadline_state* deadline_state) {
- if (deadline_state->timer_pending) {
- grpc_timer_cancel(exec_ctx, &deadline_state->timer);
- deadline_state->timer_pending = false;
- }
-}
static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx,
grpc_deadline_state* deadline_state) {
- gpr_mu_lock(&deadline_state->timer_mu);
- cancel_timer_if_needed_locked(exec_ctx, deadline_state);
- gpr_mu_unlock(&deadline_state->timer_mu);
+ if (gpr_atm_rel_cas(&deadline_state->timer_state, GRPC_DEADLINE_STATE_PENDING,
+ GRPC_DEADLINE_STATE_FINISHED)) {
+ grpc_timer_cancel(exec_ctx, &deadline_state->timer);
+ } else {
+ // timer was either in STATE_INITAL (nothing to cancel)
+ // OR in STATE_FINISHED (again nothing to cancel)
+ }
}
// Callback run when the call is complete.
@@ -120,8 +128,8 @@ static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
grpc_deadline_state* deadline_state = arg;
cancel_timer_if_needed(exec_ctx, deadline_state);
// Invoke the next callback.
- deadline_state->next_on_complete->cb(
- exec_ctx, deadline_state->next_on_complete->cb_arg, error);
+ grpc_closure_run(exec_ctx, deadline_state->next_on_complete,
+ GRPC_ERROR_REF(error));
}
// Inject our own on_complete callback into op.
@@ -136,16 +144,13 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack) {
grpc_deadline_state* deadline_state = elem->call_data;
- memset(deadline_state, 0, sizeof(*deadline_state));
deadline_state->call_stack = call_stack;
- gpr_mu_init(&deadline_state->timer_mu);
}
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem) {
grpc_deadline_state* deadline_state = elem->call_data;
cancel_timer_if_needed(exec_ctx, deadline_state);
- gpr_mu_destroy(&deadline_state->timer_mu);
}
// Callback and associated state for starting the timer after call stack
@@ -187,10 +192,8 @@ void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
gpr_timespec new_deadline) {
grpc_deadline_state* deadline_state = elem->call_data;
- gpr_mu_lock(&deadline_state->timer_mu);
- cancel_timer_if_needed_locked(exec_ctx, deadline_state);
- start_timer_if_needed_locked(exec_ctx, elem, new_deadline);
- gpr_mu_unlock(&deadline_state->timer_mu);
+ cancel_timer_if_needed(exec_ctx, deadline_state);
+ start_timer_if_needed(exec_ctx, elem, new_deadline);
}
void grpc_deadline_state_client_start_transport_stream_op(
@@ -244,9 +247,7 @@ typedef struct server_call_data {
// Constructor for call_data. Used for both client and server filters.
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
- grpc_call_element_args* args) {
- // Note: size of call data is different between client and server.
- memset(elem->call_data, 0, elem->filter->sizeof_call_data);
+ const grpc_call_element_args* args) {
grpc_deadline_state_init(exec_ctx, elem, args->call_stack);
grpc_deadline_state_start(exec_ctx, elem, args->deadline);
return GRPC_ERROR_NONE;
diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h
index bd2b84f79e..72cd5cb929 100644
--- a/src/core/lib/channel/deadline_filter.h
+++ b/src/core/lib/channel/deadline_filter.h
@@ -35,16 +35,18 @@
#include "src/core/lib/channel/channel_stack.h"
#include "src/core/lib/iomgr/timer.h"
+typedef enum grpc_deadline_timer_state {
+ GRPC_DEADLINE_STATE_INITIAL,
+ GRPC_DEADLINE_STATE_PENDING,
+ GRPC_DEADLINE_STATE_FINISHED
+} grpc_deadline_timer_state;
+
// State used for filters that enforce call deadlines.
// Must be the first field in the filter's call_data.
typedef struct grpc_deadline_state {
// We take a reference to the call stack for the timer callback.
grpc_call_stack* call_stack;
- // Guards access to timer_pending and timer.
- gpr_mu timer_mu;
- // True if the timer callback is currently pending.
- bool timer_pending;
- // The deadline timer.
+ gpr_atm timer_state;
grpc_timer timer;
grpc_closure timer_callback;
// Closure to invoke when the call is complete.
@@ -60,6 +62,7 @@ typedef struct grpc_deadline_state {
// elem->call_data is a grpc_deadline_state.
//
+// assumes elem->call_data is zero'd
void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
grpc_call_stack* call_stack);
void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c
index 82c361c7ef..1b4240bb10 100644
--- a/src/core/lib/channel/handshaker.c
+++ b/src/core/lib/channel/handshaker.c
@@ -99,8 +99,7 @@ struct grpc_handshake_manager {
};
grpc_handshake_manager* grpc_handshake_manager_create() {
- grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager));
- memset(mgr, 0, sizeof(*mgr));
+ grpc_handshake_manager* mgr = gpr_zalloc(sizeof(grpc_handshake_manager));
gpr_mu_init(&mgr->mu);
gpr_ref_init(&mgr->refs, 1);
return mgr;
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index 49a2a980e0..c031533dd8 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -386,7 +386,7 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
calld->on_done_recv_initial_metadata = NULL;
calld->on_done_recv_trailing_metadata = NULL;
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index bb185351a8..fb70de8e96 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -252,12 +252,11 @@ static void hs_on_complete(grpc_exec_ctx *exec_ctx, void *user_data,
*calld->pp_recv_message = calld->payload_bin_delivered
? NULL
: (grpc_byte_stream *)&calld->read_stream;
- calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg,
- err);
+ grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
calld->recv_message_ready = NULL;
calld->payload_bin_delivered = true;
}
- calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, err);
+ grpc_closure_run(exec_ctx, calld->on_complete, GRPC_ERROR_REF(err));
}
static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
@@ -268,8 +267,7 @@ static void hs_recv_message_ready(grpc_exec_ctx *exec_ctx, void *user_data,
/* do nothing. This is probably a GET request, and payload will be returned
in hs_on_complete callback. */
} else {
- calld->recv_message_ready->cb(exec_ctx, calld->recv_message_ready->cb_arg,
- err);
+ grpc_closure_run(exec_ctx, calld->recv_message_ready, GRPC_ERROR_REF(err));
}
}
@@ -343,11 +341,10 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
/* Constructor for call_data */
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- grpc_call_element_args *args) {
+ const grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
- memset(calld, 0, sizeof(*calld));
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem,
grpc_schedule_on_exec_ctx);
grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem,
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c
index 5e22860cfb..b424c0d2ac 100644
--- a/src/core/lib/channel/message_size_filter.c
+++ b/src/core/lib/channel/message_size_filter.c
@@ -166,7 +166,7 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx,
// Constructor for call_data.
static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
grpc_call_element* elem,
- grpc_call_element_args* args) {
+ const grpc_call_element_args* args) {
channel_data* chand = elem->channel_data;
call_data* calld = elem->call_data;
calld->next_recv_message_ready = NULL;
@@ -208,7 +208,6 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
grpc_channel_element_args* args) {
GPR_ASSERT(!args->is_last);
channel_data* chand = elem->channel_data;
- memset(chand, 0, sizeof(*chand));
chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH;
chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH;
for (size_t i = 0; i < args->channel_args->num_args; ++i) {