diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/load_reporting/load_reporting.c | 20 | ||||
-rw-r--r-- | src/core/ext/load_reporting/load_reporting.h | 40 | ||||
-rw-r--r-- | src/core/ext/load_reporting/load_reporting_filter.c | 104 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 3 |
4 files changed, 103 insertions, 64 deletions
diff --git a/src/core/ext/load_reporting/load_reporting.c b/src/core/ext/load_reporting/load_reporting.c index 9e4d32676f..1a1562d058 100644 --- a/src/core/ext/load_reporting/load_reporting.c +++ b/src/core/ext/load_reporting/load_reporting.c @@ -50,11 +50,11 @@ struct grpc_load_reporting_config { grpc_load_reporting_config *grpc_load_reporting_config_create( grpc_load_reporting_fn fn, void *user_data) { GPR_ASSERT(fn != NULL); - grpc_load_reporting_config *lrc = + grpc_load_reporting_config *lr_config = gpr_malloc(sizeof(grpc_load_reporting_config)); - lrc->fn = fn; - lrc->user_data = user_data; - return lrc; + lr_config->fn = fn; + lr_config->user_data = user_data; + return lr_config; } grpc_load_reporting_config *grpc_load_reporting_config_copy( @@ -62,14 +62,14 @@ grpc_load_reporting_config *grpc_load_reporting_config_copy( return grpc_load_reporting_config_create(src->fn, src->user_data); } -void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lrc) { - gpr_free(lrc); +void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lr_config) { + gpr_free(lr_config); } void grpc_load_reporting_config_call( - grpc_load_reporting_config *lrc, + grpc_load_reporting_config *lr_config, const grpc_load_reporting_call_data *call_data) { - lrc->fn(call_data, lrc->user_data); + lr_config->fn(call_data, lr_config->user_data); } static bool is_load_reporting_enabled(const grpc_channel_args *a) { @@ -110,11 +110,11 @@ static const grpc_arg_pointer_vtable lrd_ptr_vtable = { lrd_arg_copy, lrd_arg_destroy, lrd_arg_cmp}; grpc_arg grpc_load_reporting_config_create_arg( - grpc_load_reporting_config *lrc) { + grpc_load_reporting_config *lr_config) { grpc_arg arg; arg.type = GRPC_ARG_POINTER; arg.key = GRPC_ARG_ENABLE_LOAD_REPORTING; - arg.value.pointer.p = lrc; + arg.value.pointer.p = lr_config; arg.value.pointer.vtable = &lrd_ptr_vtable; return arg; } diff --git a/src/core/ext/load_reporting/load_reporting.h b/src/core/ext/load_reporting/load_reporting.h index 4f3ecd3661..71e5a7c0ab 100644 --- a/src/core/ext/load_reporting/load_reporting.h +++ b/src/core/ext/load_reporting/load_reporting.h @@ -37,8 +37,15 @@ #include <grpc/impl/codegen/grpc_types.h> #include "src/core/lib/channel/channel_stack.h" +/** Metadata key for initial metadata coming from clients */ +#define GRPC_LOAD_REPORTING_INITIAL_MD_KEY "load-reporting-initial" + +/** Metadata key for trailing metadata from servers */ +#define GRPC_LOAD_REPORTING_TRAILING_MD_KEY "load-reporting-trailing" + typedef struct grpc_load_reporting_config grpc_load_reporting_config; +/** Identifiers for the invocation point of the users LR callback */ typedef enum grpc_load_reporting_source { GRPC_LR_POINT_UNKNOWN = 0, GRPC_LR_POINT_CHANNEL_CREATION, @@ -47,17 +54,31 @@ typedef enum grpc_load_reporting_source { GRPC_LR_POINT_CALL_DESTRUCTION } grpc_load_reporting_source; -/** Call information to be passed to the provided load reporting function upon - * completion of the call */ +/** Call information to be passed to the provided LR callback. */ typedef struct grpc_load_reporting_call_data { - const grpc_load_reporting_source source; + const grpc_load_reporting_source source; /**< point of last data update. */ + + // XXX + intptr_t channel_id; + intptr_t call_id; + + /** Only valid when \a source is \a GRPC_LR_POINT_CALL_DESTRUCTION, that is, + * once the call has completed */ const grpc_call_final_info *final_info; + const char *initial_md_string; /**< value string for LR's initial md key */ const char *trailing_md_string; /**< value string for LR's trailing md key */ - const char *method; /**< Corresponds to :path header */ + const char *method_name; /**< Corresponds to :path header */ } grpc_load_reporting_call_data; -/** Custom function to be called by the load reporting filter. */ +/** Return a \a grpc_arg enabling load reporting */ +grpc_arg grpc_load_reporting_config_create_arg( + grpc_load_reporting_config *lr_config); + +/** Custom function to be called by the load reporting filter. + * + * \a call_data is provided by the runtime. \a user_data is given by the user + * as part of \a grpc_load_reporting_config_create */ typedef void (*grpc_load_reporting_fn)( const grpc_load_reporting_call_data *call_data, void *user_data); @@ -73,14 +94,11 @@ grpc_load_reporting_config *grpc_load_reporting_config_create( grpc_load_reporting_config *grpc_load_reporting_config_copy( grpc_load_reporting_config *src); -void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lrc); +void grpc_load_reporting_config_destroy(grpc_load_reporting_config *lr_config); -/** Invoke the function registered by \a grpc_load_reporting_init. */ +/** Invoke the LR callback from \a lr_config with \a call_data */ void grpc_load_reporting_config_call( - grpc_load_reporting_config *lrc, + grpc_load_reporting_config *lr_config, const grpc_load_reporting_call_data *call_data); -/** Return a \a grpc_arg enabling load reporting */ -grpc_arg grpc_load_reporting_config_create_arg(grpc_load_reporting_config *lrc); - #endif /* GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H */ diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index 11a39c5b75..55e06f2774 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -31,6 +31,7 @@ * */ +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> @@ -43,37 +44,40 @@ #include "src/core/lib/transport/static_metadata.h" typedef struct call_data { - const char *trailing_md_string; - const char *initial_md_string; + intptr_t id; /**< an id unique to the call */ + char *trailing_md_string; + char *initial_md_string; const char *service_method; - grpc_metadata_batch *recv_initial_metadata; - + /* stores the recv_initial_metadata op's ready closure, which we wrap with our + * own (on_initial_md_ready) in order to capture the incoming initial metadata + * */ grpc_closure *ops_recv_initial_metadata_ready; + /* to get notified of the availability of the incoming initial metadata. */ grpc_closure on_initial_md_ready; - + grpc_metadata_batch *recv_initial_metadata; } call_data; typedef struct channel_data { - gpr_mu mu; - grpc_load_reporting_config *lrc; + intptr_t id; /**< an id unique to the channel */ + grpc_load_reporting_config *lr_config; } channel_data; -static void invoke_lr_fn_locked(grpc_load_reporting_config *lrc, +static void invoke_lr_fn_locked(grpc_load_reporting_config *lr_config, grpc_load_reporting_call_data *lr_call_data) { GPR_TIMER_BEGIN("load_reporting_config_fn", 0); - grpc_load_reporting_config_call(lrc, lr_call_data); + grpc_load_reporting_config_call(lr_config, lr_call_data); GPR_TIMER_END("load_reporting_config_fn", 0); } typedef struct { grpc_call_element *elem; grpc_exec_ctx *exec_ctx; -} server_filter_args; +} recv_md_filter_args; static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) { - server_filter_args *a = user_data; + recv_md_filter_args *a = user_data; grpc_call_element *elem = a->elem; call_data *calld = elem->call_data; @@ -81,22 +85,22 @@ static grpc_mdelem *recv_md_filter(void *user_data, grpc_mdelem *md) { calld->service_method = grpc_mdstr_as_c_string(md->value); } else if (md->key == GRPC_MDSTR_LOAD_REPORTING_INITIAL) { calld->initial_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value)); - return NULL; } return md; } static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *err) { + grpc_error *err) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (err == GRPC_ERROR_NONE) { - server_filter_args a; + recv_md_filter_args a; a.elem = elem; a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter, &a); + grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter, + &a); if (calld->service_method == NULL) { err = grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header")); @@ -116,13 +120,17 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, call_data *calld = elem->call_data; memset(calld, 0, sizeof(call_data)); + calld->id = (intptr_t)args->call_stack; grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem); grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION, - NULL, NULL, NULL, NULL}; - gpr_mu_lock(&chand->mu); - invoke_lr_fn_locked(chand->lrc, &lr_call_data); - gpr_mu_unlock(&chand->mu); + (intptr_t)chand->id, + (intptr_t)calld->id, + NULL, + NULL, + NULL, + NULL}; + invoke_lr_fn_locked(chand->lr_config, &lr_call_data); } /* Destructor for call_data */ @@ -132,13 +140,18 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - grpc_load_reporting_call_data lr_call_data = { - GRPC_LR_POINT_CALL_DESTRUCTION, final_info, calld->initial_md_string, - calld->trailing_md_string, calld->service_method}; + grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION, + (intptr_t)chand->id, + (intptr_t)calld->id, + final_info, + calld->initial_md_string, + calld->trailing_md_string, + calld->service_method}; - gpr_mu_lock(&chand->mu); - invoke_lr_fn_locked(chand->lrc, &lr_call_data); - gpr_mu_unlock(&chand->mu); + invoke_lr_fn_locked(chand->lr_config, &lr_call_data); + + gpr_free(calld->initial_md_string); + gpr_free(calld->trailing_md_string); } /* Constructor for channel_data */ @@ -149,24 +162,28 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; memset(chand, 0, sizeof(channel_data)); - gpr_mu_init(&chand->mu); + + chand->id = (intptr_t)args->channel_stack; for (size_t i = 0; i < args->channel_args->num_args; i++) { if (0 == strcmp(args->channel_args->args[i].key, GRPC_ARG_ENABLE_LOAD_REPORTING)) { - grpc_load_reporting_config *arg_lrc = + grpc_load_reporting_config *arg_lr_config = args->channel_args->args[i].value.pointer.p; - chand->lrc = grpc_load_reporting_config_copy(arg_lrc); - GPR_ASSERT(chand->lrc != NULL); + chand->lr_config = grpc_load_reporting_config_copy(arg_lr_config); + GPR_ASSERT(chand->lr_config != NULL); break; } } - GPR_ASSERT(chand->lrc != NULL); /* arg actually found */ + GPR_ASSERT(chand->lr_config != NULL); /* arg actually found */ grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION, - NULL, NULL, NULL, NULL}; - gpr_mu_lock(&chand->mu); - invoke_lr_fn_locked(chand->lrc, &lr_call_data); - gpr_mu_unlock(&chand->mu); + (intptr_t)chand, + 0, + NULL, + NULL, + NULL, + NULL}; + invoke_lr_fn_locked(chand->lr_config, &lr_call_data); } /* Destructor for channel data */ @@ -174,10 +191,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; grpc_load_reporting_call_data lr_call_data = { - GRPC_LR_POINT_CHANNEL_DESTRUCTION, NULL, NULL, NULL, NULL}; - invoke_lr_fn_locked(chand->lrc, &lr_call_data); - gpr_mu_destroy(&chand->mu); - grpc_load_reporting_config_destroy(chand->lrc); + GRPC_LR_POINT_CHANNEL_DESTRUCTION, + (intptr_t)chand->id, + 0, + NULL, + NULL, + NULL, + NULL}; + invoke_lr_fn_locked(chand->lr_config, &lr_call_data); + grpc_load_reporting_config_destroy(chand->lr_config); } static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) { @@ -186,7 +208,6 @@ static grpc_mdelem *lr_trailing_md_filter(void *user_data, grpc_mdelem *md) { if (md->key == GRPC_MDSTR_LOAD_REPORTING_TRAILING) { calld->trailing_md_string = gpr_strdup(grpc_mdstr_as_c_string(md->value)); - return NULL; } return md; @@ -199,10 +220,9 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx, call_data *calld = elem->call_data; if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ calld->recv_initial_metadata = op->recv_initial_metadata; - calld->ops_recv_initial_metadata_ready = - op->recv_initial_metadata_ready; + /* substitute our callback for the higher callback */ + calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->on_initial_md_ready; } else if (op->send_trailing_metadata) { grpc_metadata_batch_filter(op->send_trailing_metadata, diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index d25917811d..6a18c3dc4d 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -77,6 +77,7 @@ typedef struct { gpr_timespec latency; /* From call creating to enqueing of received status */ } grpc_call_stats; +/** Information about the call upon completion. */ typedef struct { grpc_call_stats stats; grpc_status_code final_status; @@ -123,7 +124,7 @@ typedef struct { The filter does not need to do any chaining. The bottom filter of a stack will be passed a non-NULL pointer to \a and_free_memory that should be passed to gpr_free when destruction - is complete. \a final_info contains data about the completed code, mainly + is complete. \a final_info contains data about the completed call, mainly for reporting purposes. */ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info* final_info, |