diff options
Diffstat (limited to 'src/core/ext/load_reporting/load_reporting_filter.c')
-rw-r--r-- | src/core/ext/load_reporting/load_reporting_filter.c | 153 |
1 files changed, 117 insertions, 36 deletions
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index f372f88c3a..99b560ae27 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -31,6 +31,7 @@ * */ +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> @@ -42,17 +43,67 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/transport/static_metadata.h" -typedef struct call_data { const char *trailing_md_string; } call_data; +typedef struct call_data { + intptr_t id; /**< an id unique to the call */ + char *trailing_md_string; + char *initial_md_string; + const char *service_method; + + /* stores the recv_initial_metadata op's ready closure, which we wrap with our + * own (on_initial_md_ready) in order to capture the incoming initial metadata + * */ + grpc_closure *ops_recv_initial_metadata_ready; + + /* to get notified of the availability of the incoming initial metadata. */ + grpc_closure on_initial_md_ready; + grpc_metadata_batch *recv_initial_metadata; +} call_data; + typedef struct channel_data { - gpr_mu mu; - grpc_load_reporting_config *lrc; + intptr_t id; /**< an id unique to the channel */ } channel_data; -static void invoke_lr_fn_locked(grpc_load_reporting_config *lrc, - grpc_load_reporting_call_data *lr_call_data) { - GPR_TIMER_BEGIN("load_reporting_config_fn", 0); - grpc_load_reporting_config_call(lrc, lr_call_data); - GPR_TIMER_END("load_reporting_config_fn", 0); +typedef struct { + grpc_call_element *elem; + grpc_exec_ctx *exec_ctx; +} recv_md_filter_args; + +static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) { + recv_md_filter_args *a = user_data; + grpc_call_element *elem = a->elem; + call_data *calld = elem->call_data; + + if (md->key == GRPC_MDSTR_PATH) { + calld->service_method = grpc_mdstr_as_c_string(md->value); + } else if (md->key == GRPC_MDSTR_LOAD_REPORTING_INITIAL) { + calld->initial_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value)); + return NULL; + } + + return md; +} + +static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_error *err) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + + if (err == GRPC_ERROR_NONE) { + recv_md_filter_args a; + a.elem = elem; + a.exec_ctx = exec_ctx; + grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter, + &a); + if (calld->service_method == NULL) { + err = + grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header")); + } + } else { + GRPC_ERROR_REF(err); + } + calld->ops_recv_initial_metadata_ready->cb( + exec_ctx, calld->ops_recv_initial_metadata_ready->cb_arg, err); + GRPC_ERROR_UNREF(err); } /* Constructor for call_data */ @@ -60,20 +111,41 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { call_data *calld = elem->call_data; memset(calld, 0, sizeof(call_data)); + + calld->id = (intptr_t)args->call_stack; + grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem); + + /* TODO(dgq): do something with the data + channel_data *chand = elem->channel_data; + grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION, + (intptr_t)chand->id, + (intptr_t)calld->id, + NULL, + NULL, + NULL, + NULL}; + */ } /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - const grpc_call_stats *stats, void *ignored) { - channel_data *chand = elem->channel_data; + const grpc_call_final_info *final_info, + void *ignored) { call_data *calld = elem->call_data; - grpc_load_reporting_call_data lr_call_data = {stats, - calld->trailing_md_string}; - - gpr_mu_lock(&chand->mu); - invoke_lr_fn_locked(chand->lrc, &lr_call_data); - gpr_mu_unlock(&chand->mu); + /* TODO(dgq): do something with the data + channel_data *chand = elem->channel_data; + grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION, + (intptr_t)chand->id, + (intptr_t)calld->id, + final_info, + calld->initial_md_string, + calld->trailing_md_string, + calld->service_method}; + */ + + gpr_free(calld->initial_md_string); + gpr_free(calld->trailing_md_string); } /* Constructor for channel_data */ @@ -85,37 +157,40 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; memset(chand, 0, sizeof(channel_data)); - gpr_mu_init(&chand->mu); - for (size_t i = 0; i < args->channel_args->num_args; i++) { - if (0 == strcmp(args->channel_args->args[i].key, - GRPC_ARG_ENABLE_LOAD_REPORTING)) { - grpc_load_reporting_config *arg_lrc = - args->channel_args->args[i].value.pointer.p; - chand->lrc = grpc_load_reporting_config_copy(arg_lrc); - GPR_ASSERT(chand->lrc != NULL); - break; - } - } - GPR_ASSERT(chand->lrc != NULL); /* arg actually found */ - - gpr_mu_lock(&chand->mu); - invoke_lr_fn_locked(chand->lrc, NULL); - gpr_mu_unlock(&chand->mu); + chand->id = (intptr_t)args->channel_stack; + + /* TODO(dgq): do something with the data + grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION, + (intptr_t)chand, + 0, + NULL, + NULL, + NULL, + NULL}; + */ } /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { + /* TODO(dgq): do something with the data channel_data *chand = elem->channel_data; - gpr_mu_destroy(&chand->mu); - grpc_load_reporting_config_destroy(chand->lrc); + grpc_load_reporting_call_data lr_call_data = { + GRPC_LR_POINT_CHANNEL_DESTRUCTION, + (intptr_t)chand->id, + 0, + NULL, + NULL, + NULL, + NULL}; + */ } static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; - if (md->key == GRPC_MDSTR_LOAD_REPORTING) { + if (md->key == GRPC_MDSTR_LOAD_REPORTING_TRAILING) { calld->trailing_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value)); return NULL; } @@ -127,8 +202,14 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op *op) { GPR_TIMER_BEGIN("lr_start_transport_stream_op", 0); + call_data *calld = elem->call_data; - if (op->send_trailing_metadata) { + if (op->recv_initial_metadata) { + calld->recv_initial_metadata = op->recv_initial_metadata; + /* substitute our callback for the higher callback */ + calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready; + op->recv_initial_metadata_ready = &calld->on_initial_md_ready; + } else if (op->send_trailing_metadata) { grpc_metadata_batch_filter(op->send_trailing_metadata, lr_trailing_md_filter, elem); } |