aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/load_reporting/load_reporting_filter.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/load_reporting/load_reporting_filter.c')
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c153
1 files changed, 117 insertions, 36 deletions
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index f372f88c3a..99b560ae27 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -31,6 +31,7 @@
*
*/
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
@@ -42,17 +43,67 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/transport/static_metadata.h"
-typedef struct call_data { const char *trailing_md_string; } call_data;
+typedef struct call_data {
+ intptr_t id; /**< an id unique to the call */
+ char *trailing_md_string;
+ char *initial_md_string;
+ const char *service_method;
+
+ /* stores the recv_initial_metadata op's ready closure, which we wrap with our
+ * own (on_initial_md_ready) in order to capture the incoming initial metadata
+ * */
+ grpc_closure *ops_recv_initial_metadata_ready;
+
+ /* to get notified of the availability of the incoming initial metadata. */
+ grpc_closure on_initial_md_ready;
+ grpc_metadata_batch *recv_initial_metadata;
+} call_data;
+
typedef struct channel_data {
- gpr_mu mu;
- grpc_load_reporting_config *lrc;
+ intptr_t id; /**< an id unique to the channel */
} channel_data;
-static void invoke_lr_fn_locked(grpc_load_reporting_config *lrc,
- grpc_load_reporting_call_data *lr_call_data) {
- GPR_TIMER_BEGIN("load_reporting_config_fn", 0);
- grpc_load_reporting_config_call(lrc, lr_call_data);
- GPR_TIMER_END("load_reporting_config_fn", 0);
+typedef struct {
+ grpc_call_element *elem;
+ grpc_exec_ctx *exec_ctx;
+} recv_md_filter_args;
+
+static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) {
+ recv_md_filter_args *a = user_data;
+ grpc_call_element *elem = a->elem;
+ call_data *calld = elem->call_data;
+
+ if (md->key == GRPC_MDSTR_PATH) {
+ calld->service_method = grpc_mdstr_as_c_string(md->value);
+ } else if (md->key == GRPC_MDSTR_LOAD_REPORTING_INITIAL) {
+ calld->initial_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
+ return NULL;
+ }
+
+ return md;
+}
+
+static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data,
+ grpc_error *err) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+
+ if (err == GRPC_ERROR_NONE) {
+ recv_md_filter_args a;
+ a.elem = elem;
+ a.exec_ctx = exec_ctx;
+ grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter,
+ &a);
+ if (calld->service_method == NULL) {
+ err =
+ grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header"));
+ }
+ } else {
+ GRPC_ERROR_REF(err);
+ }
+ calld->ops_recv_initial_metadata_ready->cb(
+ exec_ctx, calld->ops_recv_initial_metadata_ready->cb_arg, err);
+ GRPC_ERROR_UNREF(err);
}
/* Constructor for call_data */
@@ -60,20 +111,41 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(call_data));
+
+ calld->id = (intptr_t)args->call_stack;
+ grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem);
+
+ /* TODO(dgq): do something with the data
+ channel_data *chand = elem->channel_data;
+ grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION,
+ (intptr_t)chand->id,
+ (intptr_t)calld->id,
+ NULL,
+ NULL,
+ NULL,
+ NULL};
+ */
}
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- const grpc_call_stats *stats, void *ignored) {
- channel_data *chand = elem->channel_data;
+ const grpc_call_final_info *final_info,
+ void *ignored) {
call_data *calld = elem->call_data;
- grpc_load_reporting_call_data lr_call_data = {stats,
- calld->trailing_md_string};
-
- gpr_mu_lock(&chand->mu);
- invoke_lr_fn_locked(chand->lrc, &lr_call_data);
- gpr_mu_unlock(&chand->mu);
+ /* TODO(dgq): do something with the data
+ channel_data *chand = elem->channel_data;
+ grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION,
+ (intptr_t)chand->id,
+ (intptr_t)calld->id,
+ final_info,
+ calld->initial_md_string,
+ calld->trailing_md_string,
+ calld->service_method};
+ */
+
+ gpr_free(calld->initial_md_string);
+ gpr_free(calld->trailing_md_string);
}
/* Constructor for channel_data */
@@ -85,37 +157,40 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(channel_data));
- gpr_mu_init(&chand->mu);
- for (size_t i = 0; i < args->channel_args->num_args; i++) {
- if (0 == strcmp(args->channel_args->args[i].key,
- GRPC_ARG_ENABLE_LOAD_REPORTING)) {
- grpc_load_reporting_config *arg_lrc =
- args->channel_args->args[i].value.pointer.p;
- chand->lrc = grpc_load_reporting_config_copy(arg_lrc);
- GPR_ASSERT(chand->lrc != NULL);
- break;
- }
- }
- GPR_ASSERT(chand->lrc != NULL); /* arg actually found */
-
- gpr_mu_lock(&chand->mu);
- invoke_lr_fn_locked(chand->lrc, NULL);
- gpr_mu_unlock(&chand->mu);
+ chand->id = (intptr_t)args->channel_stack;
+
+ /* TODO(dgq): do something with the data
+ grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION,
+ (intptr_t)chand,
+ 0,
+ NULL,
+ NULL,
+ NULL,
+ NULL};
+ */
}
/* Destructor for channel data */
static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
+ /* TODO(dgq): do something with the data
channel_data *chand = elem->channel_data;
- gpr_mu_destroy(&chand->mu);
- grpc_load_reporting_config_destroy(chand->lrc);
+ grpc_load_reporting_call_data lr_call_data = {
+ GRPC_LR_POINT_CHANNEL_DESTRUCTION,
+ (intptr_t)chand->id,
+ 0,
+ NULL,
+ NULL,
+ NULL,
+ NULL};
+ */
}
static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
- if (md->key == GRPC_MDSTR_LOAD_REPORTING) {
+ if (md->key == GRPC_MDSTR_LOAD_REPORTING_TRAILING) {
calld->trailing_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
return NULL;
}
@@ -127,8 +202,14 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_transport_stream_op *op) {
GPR_TIMER_BEGIN("lr_start_transport_stream_op", 0);
+ call_data *calld = elem->call_data;
- if (op->send_trailing_metadata) {
+ if (op->recv_initial_metadata) {
+ calld->recv_initial_metadata = op->recv_initial_metadata;
+ /* substitute our callback for the higher callback */
+ calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->on_initial_md_ready;
+ } else if (op->send_trailing_metadata) {
grpc_metadata_batch_filter(op->send_trailing_metadata,
lr_trailing_md_filter, elem);
}