diff options
Diffstat (limited to 'src/core/channel/http_client_filter.c')
-rw-r--r-- | src/core/channel/http_client_filter.c | 75 |
1 files changed, 40 insertions, 35 deletions
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 5f20f8c16d..d67dc37ad2 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -50,11 +50,11 @@ typedef struct call_data { grpc_stream_op_buffer *recv_ops; /** Closure to call when finished with the hc_on_recv hook */ - grpc_iomgr_closure *on_done_recv; + grpc_closure *on_done_recv; /** Receive closures are chained: we inject this closure as the on_done_recv up-call on transport_op, and remember to call our on_done_recv member after handling it. */ - grpc_iomgr_closure hc_on_recv; + grpc_closure hc_on_recv; } call_data; typedef struct channel_data { @@ -67,16 +67,19 @@ typedef struct channel_data { grpc_mdelem *user_agent; } channel_data; -/* used to silence 'variable not used' warnings */ -static void ignore_unused(void *ignored) {} +typedef struct { + grpc_call_element *elem; + grpc_exec_ctx *exec_ctx; +} client_recv_filter_args; static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { - grpc_call_element *elem = user_data; + client_recv_filter_args *a = user_data; + grpc_call_element *elem = a->elem; channel_data *channeld = elem->channel_data; if (md == channeld->status) { return NULL; } else if (md->key == channeld->status->key) { - grpc_call_element_send_cancel(elem); + grpc_call_element_send_cancel(a->exec_ctx, elem); return NULL; } else if (md->key == channeld->content_type->key) { return NULL; @@ -84,7 +87,7 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) { return md; } -static void hc_on_recv(void *user_data, int success) { +static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; size_t i; @@ -92,15 +95,16 @@ static void hc_on_recv(void *user_data, int success) { grpc_stream_op *ops = calld->recv_ops->ops; for (i = 0; i < nops; i++) { grpc_stream_op *op = &ops[i]; + client_recv_filter_args a; if (op->type != GRPC_OP_METADATA) continue; calld->got_initial_metadata = 1; - grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, elem); + a.elem = elem; + a.exec_ctx = exec_ctx; + grpc_metadata_batch_filter(&op->data.metadata, client_recv_filter, &a); } - calld->on_done_recv->cb(calld->on_done_recv->cb_arg, success); + calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); } - - static grpc_mdelem *client_strip_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; channel_data *channeld = elem->channel_data; @@ -123,21 +127,25 @@ static void hc_mutate_op(grpc_call_element *elem, size_t nops = op->send_ops->nops; grpc_stream_op *ops = op->send_ops->ops; for (i = 0; i < nops; i++) { - grpc_stream_op *op = &ops[i]; - if (op->type != GRPC_OP_METADATA) continue; + grpc_stream_op *stream_op = &ops[i]; + if (stream_op->type != GRPC_OP_METADATA) continue; calld->sent_initial_metadata = 1; - grpc_metadata_batch_filter(&op->data.metadata, client_strip_filter, elem); + grpc_metadata_batch_filter(&stream_op->data.metadata, client_strip_filter, + elem); /* Send : prefixed headers, which have to be before any application layer headers. */ - grpc_metadata_batch_add_head(&op->data.metadata, &calld->method, + grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->method, GRPC_MDELEM_REF(channeld->method)); - grpc_metadata_batch_add_head(&op->data.metadata, &calld->scheme, + grpc_metadata_batch_add_head(&stream_op->data.metadata, &calld->scheme, GRPC_MDELEM_REF(channeld->scheme)); - grpc_metadata_batch_add_tail(&op->data.metadata, &calld->te_trailers, + grpc_metadata_batch_add_tail(&stream_op->data.metadata, + &calld->te_trailers, GRPC_MDELEM_REF(channeld->te_trailers)); - grpc_metadata_batch_add_tail(&op->data.metadata, &calld->content_type, + grpc_metadata_batch_add_tail(&stream_op->data.metadata, + &calld->content_type, GRPC_MDELEM_REF(channeld->content_type)); - grpc_metadata_batch_add_tail(&op->data.metadata, &calld->user_agent, + grpc_metadata_batch_add_tail(&stream_op->data.metadata, + &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); break; } @@ -151,34 +159,29 @@ static void hc_mutate_op(grpc_call_element *elem, } } -static void hc_start_transport_op(grpc_call_element *elem, +static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, grpc_transport_stream_op *op) { GRPC_CALL_LOG_OP(GPR_INFO, elem, op); hc_mutate_op(elem, op); - grpc_call_next_op(elem, op); + grpc_call_next_op(exec_ctx, elem, op); } /* Constructor for call_data */ -static void init_call_elem(grpc_call_element *elem, +static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const void *server_transport_data, grpc_transport_stream_op *initial_op) { call_data *calld = elem->call_data; calld->sent_initial_metadata = 0; calld->got_initial_metadata = 0; calld->on_done_recv = NULL; - grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem); + grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem); if (initial_op) hc_mutate_op(elem, initial_op); } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - - ignore_unused(calld); - ignore_unused(channeld); -} +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) {} static const char *scheme_from_args(const grpc_channel_args *args) { unsigned i; @@ -236,14 +239,15 @@ static grpc_mdstr *user_agent_from_args(grpc_mdctx *mdctx, tmp = gpr_strvec_flatten(&v, NULL); gpr_strvec_destroy(&v); - result = grpc_mdstr_from_string(mdctx, tmp, 0); + result = grpc_mdstr_from_string(mdctx, tmp); gpr_free(tmp); return result; } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, +static void init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *channel_args, grpc_mdctx *mdctx, int is_first, int is_last) { /* grab pointers to our data from the channel element */ @@ -263,12 +267,13 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc"); channeld->status = grpc_mdelem_from_strings(mdctx, ":status", "200"); channeld->user_agent = grpc_mdelem_from_metadata_strings( - mdctx, grpc_mdstr_from_string(mdctx, "user-agent", 0), + mdctx, grpc_mdstr_from_string(mdctx, "user-agent"), user_agent_from_args(mdctx, channel_args)); } /* Destructor for channel data */ -static void destroy_channel_elem(grpc_channel_element *elem) { +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) { /* grab pointers to our data from the channel element */ channel_data *channeld = elem->channel_data; |