aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/load_reporting/load_reporting.c20
-rw-r--r--src/core/ext/load_reporting/load_reporting.h40
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c104
-rw-r--r--src/core/lib/channel/channel_stack.h3
4 files changed, 103 insertions, 64 deletions
diff --git a/src/core/ext/load_reporting/load_reporting.c b/src/core/ext/load_reporting/load_reporting.c
index 9e4d32676f..1a1562d058 100644
--- a/src/core/ext/load_reporting/load_reporting.c
+++ b/src/core/ext/load_reporting/load_reporting.c
@@ -50,11 +50,11 @@ struct grpc_load_reporting_config {
grpc_load_reporting_config *grpc_load_reporting_config_create(
grpc_load_reporting_fn fn, void *user_data) {
GPR_ASSERT(fn != NULL);
- grpc_load_reporting_config *lrc =
+ grpc_load_reporting_config *lr_config =
gpr_malloc(sizeof(grpc_load_reporting_config));
- lrc->fn = fn;
- lrc->user_data = user_data;
- return lrc;
+ lr_config->fn = fn;
+ lr_config->user_data = user_data;
+ return lr_config;
}
grpc_load_reporting_config *grpc_load_reporting_config_copy(
@@ -62,14 +62,14 @@ grpc_load_reporting_config *grpc_load_reporting_config_copy(
return grpc_load_reporting_config_create(src->fn, src->user_data);
}
-void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lrc) {
- gpr_free(lrc);
+void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lr_config) {
+ gpr_free(lr_config);
}
void grpc_load_reporting_config_call(
- grpc_load_reporting_config *lrc,
+ grpc_load_reporting_config *lr_config,
const grpc_load_reporting_call_data *call_data) {
- lrc->fn(call_data, lrc->user_data);
+ lr_config->fn(call_data, lr_config->user_data);
}
static bool is_load_reporting_enabled(const grpc_channel_args *a) {
@@ -110,11 +110,11 @@ static const grpc_arg_pointer_vtable lrd_ptr_vtable = {
lrd_arg_copy, lrd_arg_destroy, lrd_arg_cmp};
grpc_arg grpc_load_reporting_config_create_arg(
- grpc_load_reporting_config *lrc) {
+ grpc_load_reporting_config *lr_config) {
grpc_arg arg;
arg.type = GRPC_ARG_POINTER;
arg.key = GRPC_ARG_ENABLE_LOAD_REPORTING;
- arg.value.pointer.p = lrc;
+ arg.value.pointer.p = lr_config;
arg.value.pointer.vtable = &lrd_ptr_vtable;
return arg;
}
diff --git a/src/core/ext/load_reporting/load_reporting.h b/src/core/ext/load_reporting/load_reporting.h
index 4f3ecd3661..71e5a7c0ab 100644
--- a/src/core/ext/load_reporting/load_reporting.h
+++ b/src/core/ext/load_reporting/load_reporting.h
@@ -37,8 +37,15 @@
#include <grpc/impl/codegen/grpc_types.h>
#include "src/core/lib/channel/channel_stack.h"
+/** Metadata key for initial metadata coming from clients */
+#define GRPC_LOAD_REPORTING_INITIAL_MD_KEY "load-reporting-initial"
+
+/** Metadata key for trailing metadata from servers */
+#define GRPC_LOAD_REPORTING_TRAILING_MD_KEY "load-reporting-trailing"
+
typedef struct grpc_load_reporting_config grpc_load_reporting_config;
+/** Identifiers for the invocation point of the users LR callback */
typedef enum grpc_load_reporting_source {
GRPC_LR_POINT_UNKNOWN = 0,
GRPC_LR_POINT_CHANNEL_CREATION,
@@ -47,17 +54,31 @@ typedef enum grpc_load_reporting_source {
GRPC_LR_POINT_CALL_DESTRUCTION
} grpc_load_reporting_source;
-/** Call information to be passed to the provided load reporting function upon
- * completion of the call */
+/** Call information to be passed to the provided LR callback. */
typedef struct grpc_load_reporting_call_data {
- const grpc_load_reporting_source source;
+ const grpc_load_reporting_source source; /**< point of last data update. */
+
+ // XXX
+ intptr_t channel_id;
+ intptr_t call_id;
+
+ /** Only valid when \a source is \a GRPC_LR_POINT_CALL_DESTRUCTION, that is,
+ * once the call has completed */
const grpc_call_final_info *final_info;
+
const char *initial_md_string; /**< value string for LR's initial md key */
const char *trailing_md_string; /**< value string for LR's trailing md key */
- const char *method; /**< Corresponds to :path header */
+ const char *method_name; /**< Corresponds to :path header */
} grpc_load_reporting_call_data;
-/** Custom function to be called by the load reporting filter. */
+/** Return a \a grpc_arg enabling load reporting */
+grpc_arg grpc_load_reporting_config_create_arg(
+ grpc_load_reporting_config *lr_config);
+
+/** Custom function to be called by the load reporting filter.
+ *
+ * \a call_data is provided by the runtime. \a user_data is given by the user
+ * as part of \a grpc_load_reporting_config_create */
typedef void (*grpc_load_reporting_fn)(
const grpc_load_reporting_call_data *call_data, void *user_data);
@@ -73,14 +94,11 @@ grpc_load_reporting_config *grpc_load_reporting_config_create(
grpc_load_reporting_config *grpc_load_reporting_config_copy(
grpc_load_reporting_config *src);
-void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lrc);
+void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lr_config);
-/** Invoke the function registered by \a grpc_load_reporting_init. */
+/** Invoke the LR callback from \a lr_config with \a call_data */
void grpc_load_reporting_config_call(
- grpc_load_reporting_config *lrc,
+ grpc_load_reporting_config *lr_config,
const grpc_load_reporting_call_data *call_data);
-/** Return a \a grpc_arg enabling load reporting */
-grpc_arg grpc_load_reporting_config_create_arg(grpc_load_reporting_config *lrc);
-
#endif /* GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H */
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index 11a39c5b75..55e06f2774 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -31,6 +31,7 @@
*
*/
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
@@ -43,37 +44,40 @@
#include "src/core/lib/transport/static_metadata.h"
typedef struct call_data {
- const char *trailing_md_string;
- const char *initial_md_string;
+ intptr_t id; /**< an id unique to the call */
+ char *trailing_md_string;
+ char *initial_md_string;
const char *service_method;
- grpc_metadata_batch *recv_initial_metadata;
-
+ /* stores the recv_initial_metadata op's ready closure, which we wrap with our
+ * own (on_initial_md_ready) in order to capture the incoming initial metadata
+ * */
grpc_closure *ops_recv_initial_metadata_ready;
+ /* to get notified of the availability of the incoming initial metadata. */
grpc_closure on_initial_md_ready;
-
+ grpc_metadata_batch *recv_initial_metadata;
} call_data;
typedef struct channel_data {
- gpr_mu mu;
- grpc_load_reporting_config *lrc;
+ intptr_t id; /**< an id unique to the channel */
+ grpc_load_reporting_config *lr_config;
} channel_data;
-static void invoke_lr_fn_locked(grpc_load_reporting_config *lrc,
+static void invoke_lr_fn_locked(grpc_load_reporting_config *lr_config,
grpc_load_reporting_call_data *lr_call_data) {
GPR_TIMER_BEGIN("load_reporting_config_fn", 0);
- grpc_load_reporting_config_call(lrc, lr_call_data);
+ grpc_load_reporting_config_call(lr_config, lr_call_data);
GPR_TIMER_END("load_reporting_config_fn", 0);
}
typedef struct {
grpc_call_element *elem;
grpc_exec_ctx *exec_ctx;
-} server_filter_args;
+} recv_md_filter_args;
static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) {
- server_filter_args *a = user_data;
+ recv_md_filter_args *a = user_data;
grpc_call_element *elem = a->elem;
call_data *calld = elem->call_data;
@@ -81,22 +85,22 @@ static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) {
calld->service_method = grpc_mdstr_as_c_string(md->value);
} else if (md->key == GRPC_MDSTR_LOAD_REPORTING_INITIAL) {
calld->initial_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
- return NULL;
}
return md;
}
static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data,
- grpc_error *err) {
+ grpc_error *err) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (err == GRPC_ERROR_NONE) {
- server_filter_args a;
+ recv_md_filter_args a;
a.elem = elem;
a.exec_ctx = exec_ctx;
- grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter, &a);
+ grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter,
+ &a);
if (calld->service_method == NULL) {
err =
grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header"));
@@ -116,13 +120,17 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(call_data));
+ calld->id = (intptr_t)args->call_stack;
grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem);
grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION,
- NULL, NULL, NULL, NULL};
- gpr_mu_lock(&chand->mu);
- invoke_lr_fn_locked(chand->lrc, &lr_call_data);
- gpr_mu_unlock(&chand->mu);
+ (intptr_t)chand->id,
+ (intptr_t)calld->id,
+ NULL,
+ NULL,
+ NULL,
+ NULL};
+ invoke_lr_fn_locked(chand->lr_config, &lr_call_data);
}
/* Destructor for call_data */
@@ -132,13 +140,18 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
- grpc_load_reporting_call_data lr_call_data = {
- GRPC_LR_POINT_CALL_DESTRUCTION, final_info, calld->initial_md_string,
- calld->trailing_md_string, calld->service_method};
+ grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION,
+ (intptr_t)chand->id,
+ (intptr_t)calld->id,
+ final_info,
+ calld->initial_md_string,
+ calld->trailing_md_string,
+ calld->service_method};
- gpr_mu_lock(&chand->mu);
- invoke_lr_fn_locked(chand->lrc, &lr_call_data);
- gpr_mu_unlock(&chand->mu);
+ invoke_lr_fn_locked(chand->lr_config, &lr_call_data);
+
+ gpr_free(calld->initial_md_string);
+ gpr_free(calld->trailing_md_string);
}
/* Constructor for channel_data */
@@ -149,24 +162,28 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
memset(chand, 0, sizeof(channel_data));
- gpr_mu_init(&chand->mu);
+
+ chand->id = (intptr_t)args->channel_stack;
for (size_t i = 0; i < args->channel_args->num_args; i++) {
if (0 == strcmp(args->channel_args->args[i].key,
GRPC_ARG_ENABLE_LOAD_REPORTING)) {
- grpc_load_reporting_config *arg_lrc =
+ grpc_load_reporting_config *arg_lr_config =
args->channel_args->args[i].value.pointer.p;
- chand->lrc = grpc_load_reporting_config_copy(arg_lrc);
- GPR_ASSERT(chand->lrc != NULL);
+ chand->lr_config = grpc_load_reporting_config_copy(arg_lr_config);
+ GPR_ASSERT(chand->lr_config != NULL);
break;
}
}
- GPR_ASSERT(chand->lrc != NULL); /* arg actually found */
+ GPR_ASSERT(chand->lr_config != NULL); /* arg actually found */
grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION,
- NULL, NULL, NULL, NULL};
- gpr_mu_lock(&chand->mu);
- invoke_lr_fn_locked(chand->lrc, &lr_call_data);
- gpr_mu_unlock(&chand->mu);
+ (intptr_t)chand,
+ 0,
+ NULL,
+ NULL,
+ NULL,
+ NULL};
+ invoke_lr_fn_locked(chand->lr_config, &lr_call_data);
}
/* Destructor for channel data */
@@ -174,10 +191,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
grpc_load_reporting_call_data lr_call_data = {
- GRPC_LR_POINT_CHANNEL_DESTRUCTION, NULL, NULL, NULL, NULL};
- invoke_lr_fn_locked(chand->lrc, &lr_call_data);
- gpr_mu_destroy(&chand->mu);
- grpc_load_reporting_config_destroy(chand->lrc);
+ GRPC_LR_POINT_CHANNEL_DESTRUCTION,
+ (intptr_t)chand->id,
+ 0,
+ NULL,
+ NULL,
+ NULL,
+ NULL};
+ invoke_lr_fn_locked(chand->lr_config, &lr_call_data);
+ grpc_load_reporting_config_destroy(chand->lr_config);
}
static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) {
@@ -186,7 +208,6 @@ static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) {
if (md->key == GRPC_MDSTR_LOAD_REPORTING_TRAILING) {
calld->trailing_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value));
- return NULL;
}
return md;
@@ -199,10 +220,9 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
call_data *calld = elem->call_data;
if (op->recv_initial_metadata) {
- /* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->ops_recv_initial_metadata_ready =
- op->recv_initial_metadata_ready;
+ /* substitute our callback for the higher callback */
+ calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready;
op->recv_initial_metadata_ready = &calld->on_initial_md_ready;
} else if (op->send_trailing_metadata) {
grpc_metadata_batch_filter(op->send_trailing_metadata,
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index d25917811d..6a18c3dc4d 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -77,6 +77,7 @@ typedef struct {
gpr_timespec latency; /* From call creating to enqueing of received status */
} grpc_call_stats;
+/** Information about the call upon completion. */
typedef struct {
grpc_call_stats stats;
grpc_status_code final_status;
@@ -123,7 +124,7 @@ typedef struct {
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
\a and_free_memory that should be passed to gpr_free when destruction
- is complete. \a final_info contains data about the completed code, mainly
+ is complete. \a final_info contains data about the completed call, mainly
for reporting purposes. */
void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info* final_info,