diff options
Diffstat (limited to 'src/core/census/grpc_filter.c')
-rw-r--r-- | src/core/census/grpc_filter.c | 69 |
1 files changed, 29 insertions, 40 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c index 872543057e..daa1a7ac61 100644 --- a/src/core/census/grpc_filter.c +++ b/src/core/census/grpc_filter.c @@ -53,28 +53,24 @@ typedef struct call_data { int error; /* recv callback */ - grpc_stream_op_buffer *recv_ops; + grpc_metadata_batch *recv_initial_metadata; grpc_closure *on_done_recv; + grpc_closure finish_recv; } call_data; typedef struct channel_data { grpc_mdstr *path_str; /* pointer to meta data str with key == ":path" */ } channel_data; -static void extract_and_annotate_method_tag(grpc_stream_op_buffer *sopb, +static void extract_and_annotate_method_tag(grpc_metadata_batch *md, call_data *calld, channel_data *chand) { grpc_linked_mdelem *m; - size_t i; - for (i = 0; i < sopb->nops; i++) { - grpc_stream_op *op = &sopb->ops[i]; - if (op->type != GRPC_OP_METADATA) continue; - for (m = op->data.metadata.list.head; m != NULL; m = m->next) { - if (m->md->key == chand->path_str) { - gpr_log(GPR_DEBUG, "%s", - (const char *)GPR_SLICE_START_PTR(m->md->value->slice)); - /* Add method tag here */ - } + for (m = md->list.head; m != NULL; m = m->next) { + if (m->md->key == chand->path_str) { + gpr_log(GPR_DEBUG, "%s", + (const char *)GPR_SLICE_START_PTR(m->md->value->slice)); + /* Add method tag here */ } } } @@ -83,8 +79,8 @@ static void client_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (op->send_ops) { - extract_and_annotate_method_tag(op->send_ops, calld, chand); + if (op->send_initial_metadata) { + extract_and_annotate_method_tag(op->send_initial_metadata, calld, chand); } } @@ -101,7 +97,7 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr, call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (success) { - extract_and_annotate_method_tag(calld->recv_ops, calld, chand); + extract_and_annotate_method_tag(calld->recv_initial_metadata, calld, chand); } calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success); } @@ -109,11 +105,11 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr, static void server_mutate_op(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; - if (op->recv_ops) { + if (op->recv_initial_metadata) { /* substitute our callback for the op callback */ - calld->recv_ops = op->recv_ops; - calld->on_done_recv = op->on_done_recv; - op->on_done_recv = calld->on_done_recv; + calld->recv_initial_metadata = op->recv_initial_metadata; + calld->on_done_recv = op->on_complete; + op->on_complete = &calld->finish_recv; } } @@ -128,13 +124,11 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx, static void client_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); memset(d, 0, sizeof(*d)); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); - if (initial_op) client_mutate_op(elem, initial_op); } static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, @@ -146,15 +140,13 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static void server_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_call_element_args *args) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); memset(d, 0, sizeof(*d)); d->start_ts = gpr_now(GPR_CLOCK_REALTIME); /* TODO(hongyu): call census_tracing_start_op here. */ - grpc_closure_init(d->on_done_recv, server_on_done_recv, elem); - if (initial_op) server_mutate_op(elem, initial_op); + grpc_closure_init(&d->finish_recv, server_on_done_recv, elem); } static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, @@ -165,12 +157,11 @@ static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, } static void init_channel_elem(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, grpc_channel *master, - const grpc_channel_args *args, grpc_mdctx *mdctx, - int is_first, int is_last) { + grpc_channel_element *elem, + grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; GPR_ASSERT(chand != NULL); - chand->path_str = grpc_mdstr_from_string(mdctx, ":path"); + chand->path_str = grpc_mdstr_from_string(args->metadata_context, ":path"); } static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, @@ -183,15 +174,13 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_client_census_filter = { - client_start_transport_op, grpc_channel_next_op, - sizeof(call_data), client_init_call_elem, - client_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, "census-client"}; + client_start_transport_op, grpc_channel_next_op, sizeof(call_data), + client_init_call_elem, grpc_call_stack_ignore_set_pollset, + client_destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, grpc_call_next_get_peer, "census-client"}; const grpc_channel_filter grpc_server_census_filter = { - server_start_transport_op, grpc_channel_next_op, - sizeof(call_data), server_init_call_elem, - server_destroy_call_elem, sizeof(channel_data), - init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, "census-server"}; + server_start_transport_op, grpc_channel_next_op, sizeof(call_data), + server_init_call_elem, grpc_call_stack_ignore_set_pollset, + server_destroy_call_elem, sizeof(channel_data), init_channel_elem, + destroy_channel_elem, grpc_call_next_get_peer, "census-server"}; |