aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/census
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/census')
-rw-r--r--src/core/census/grpc_filter.c69
1 files changed, 29 insertions, 40 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index 872543057e..daa1a7ac61 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -53,28 +53,24 @@ typedef struct call_data {
int error;
/* recv callback */
- grpc_stream_op_buffer *recv_ops;
+ grpc_metadata_batch *recv_initial_metadata;
grpc_closure *on_done_recv;
+ grpc_closure finish_recv;
} call_data;
typedef struct channel_data {
grpc_mdstr *path_str; /* pointer to meta data str with key == ":path" */
} channel_data;
-static void extract_and_annotate_method_tag(grpc_stream_op_buffer *sopb,
+static void extract_and_annotate_method_tag(grpc_metadata_batch *md,
call_data *calld,
channel_data *chand) {
grpc_linked_mdelem *m;
- size_t i;
- for (i = 0; i < sopb->nops; i++) {
- grpc_stream_op *op = &sopb->ops[i];
- if (op->type != GRPC_OP_METADATA) continue;
- for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
- if (m->md->key == chand->path_str) {
- gpr_log(GPR_DEBUG, "%s",
- (const char *)GPR_SLICE_START_PTR(m->md->value->slice));
- /* Add method tag here */
- }
+ for (m = md->list.head; m != NULL; m = m->next) {
+ if (m->md->key == chand->path_str) {
+ gpr_log(GPR_DEBUG, "%s",
+ (const char *)GPR_SLICE_START_PTR(m->md->value->slice));
+ /* Add method tag here */
}
}
}
@@ -83,8 +79,8 @@ static void client_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (op->send_ops) {
- extract_and_annotate_method_tag(op->send_ops, calld, chand);
+ if (op->send_initial_metadata) {
+ extract_and_annotate_method_tag(op->send_initial_metadata, calld, chand);
}
}
@@ -101,7 +97,7 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (success) {
- extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
+ extract_and_annotate_method_tag(calld->recv_initial_metadata, calld, chand);
}
calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
}
@@ -109,11 +105,11 @@ static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
static void server_mutate_op(grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
- if (op->recv_ops) {
+ if (op->recv_initial_metadata) {
/* substitute our callback for the op callback */
- calld->recv_ops = op->recv_ops;
- calld->on_done_recv = op->on_done_recv;
- op->on_done_recv = calld->on_done_recv;
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ calld->on_done_recv = op->on_complete;
+ op->on_complete = &calld->finish_recv;
}
}
@@ -128,13 +124,11 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
- if (initial_op) client_mutate_op(elem, initial_op);
}
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
@@ -146,15 +140,13 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
- const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
/* TODO(hongyu): call census_tracing_start_op here. */
- grpc_closure_init(d->on_done_recv, server_on_done_recv, elem);
- if (initial_op) server_mutate_op(elem, initial_op);
+ grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
}
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
@@ -165,12 +157,11 @@ static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
}
static void init_channel_elem(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem, grpc_channel *master,
- const grpc_channel_args *args, grpc_mdctx *mdctx,
- int is_first, int is_last) {
+ grpc_channel_element *elem,
+ grpc_channel_element_args *args) {
channel_data *chand = elem->channel_data;
GPR_ASSERT(chand != NULL);
- chand->path_str = grpc_mdstr_from_string(mdctx, ":path");
+ chand->path_str = grpc_mdstr_from_string(args->metadata_context, ":path");
}
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
@@ -183,15 +174,13 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
const grpc_channel_filter grpc_client_census_filter = {
- client_start_transport_op, grpc_channel_next_op,
- sizeof(call_data), client_init_call_elem,
- client_destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem,
- grpc_call_next_get_peer, "census-client"};
+ client_start_transport_op, grpc_channel_next_op, sizeof(call_data),
+ client_init_call_elem, grpc_call_stack_ignore_set_pollset,
+ client_destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, grpc_call_next_get_peer, "census-client"};
const grpc_channel_filter grpc_server_census_filter = {
- server_start_transport_op, grpc_channel_next_op,
- sizeof(call_data), server_init_call_elem,
- server_destroy_call_elem, sizeof(channel_data),
- init_channel_elem, destroy_channel_elem,
- grpc_call_next_get_peer, "census-server"};
+ server_start_transport_op, grpc_channel_next_op, sizeof(call_data),
+ server_init_call_elem, grpc_call_stack_ignore_set_pollset,
+ server_destroy_call_elem, sizeof(channel_data), init_channel_elem,
+ destroy_channel_elem, grpc_call_next_get_peer, "census-server"};