diff options
Diffstat (limited to 'src/core/lib')
-rw-r--r-- | src/core/lib/channel/channel_args.c | 12 | ||||
-rw-r--r-- | src/core/lib/channel/channel_args.h | 4 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.c | 4 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 4 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 76 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.h | 33 | ||||
-rw-r--r-- | src/core/lib/channel/handshaker.c | 2 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 60 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_poll_posix.c | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/timer.h | 17 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 13 | ||||
-rw-r--r-- | src/core/lib/transport/mdstr_hash_table.c | 142 | ||||
-rw-r--r-- | src/core/lib/transport/mdstr_hash_table.h | 83 |
13 files changed, 405 insertions, 54 deletions
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c index 3a56b1ff20..2957d2c818 100644 --- a/src/core/lib/channel/channel_args.c +++ b/src/core/lib/channel/channel_args.c @@ -272,6 +272,18 @@ int grpc_channel_args_compare(const grpc_channel_args *a, return 0; } +const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, + const char *name) { + if (args != NULL) { + for (size_t i = 0; i < args->num_args; ++i) { + if (strcmp(args->args[i].key, name) == 0) { + return &args->args[i]; + } + } + } + return NULL; +} + int grpc_channel_arg_get_integer(grpc_arg *arg, grpc_integer_options options) { if (arg->type != GRPC_ARG_INTEGER) { gpr_log(GPR_ERROR, "%s ignored: it must be an integer", arg->key); diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h index 586a296d1f..a80340c0fa 100644 --- a/src/core/lib/channel/channel_args.h +++ b/src/core/lib/channel/channel_args.h @@ -89,6 +89,10 @@ uint32_t grpc_channel_args_compression_algorithm_get_states( int grpc_channel_args_compare(const grpc_channel_args *a, const grpc_channel_args *b); +/** Returns the value of argument \a name from \a args, or NULL if not found. */ +const grpc_arg *grpc_channel_args_find(const grpc_channel_args *args, + const char *name); + typedef struct grpc_integer_options { int default_value; // Return this if value is outside of expected bounds. int min_value; diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 57d34d9e9a..2c5367901d 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -162,7 +162,7 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - gpr_timespec deadline, grpc_call_stack *call_stack) { + grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_call_element_args args; size_t count = channel_stack->count; @@ -179,10 +179,12 @@ grpc_error *grpc_call_stack_init( /* init per-filter data */ grpc_error *first_error = GRPC_ERROR_NONE; + args.start_time = gpr_now(GPR_CLOCK_MONOTONIC); for (i = 0; i < count; i++) { args.call_stack = call_stack; args.server_transport_data = transport_server_data; args.context = context; + args.path = path; args.deadline = deadline; call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 1cfe2885d8..27f3be7b29 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -74,6 +74,8 @@ typedef struct { grpc_call_stack *call_stack; const void *server_transport_data; grpc_call_context_element *context; + grpc_mdstr *path; + gpr_timespec start_time; gpr_timespec deadline; } grpc_call_element_args; @@ -225,7 +227,7 @@ grpc_error *grpc_call_stack_init( grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, grpc_call_context_element *context, const void *transport_server_data, - gpr_timespec deadline, grpc_call_stack *call_stack); + grpc_mdstr *path, gpr_timespec deadline, grpc_call_stack *call_stack); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 079b98a2f8..d2ea5250f6 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -64,30 +64,49 @@ static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, } // Starts the deadline timer. -static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem, - gpr_timespec deadline) { +static void start_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + gpr_timespec deadline) { grpc_deadline_state* deadline_state = elem->call_data; deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); - if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + // Note: We do not start the timer if there is already a timer + // pending. This should be okay, because this is only called from two + // functions exported by this module: grpc_deadline_state_start(), which + // starts the initial timer, and grpc_deadline_state_reset(), which + // cancels any pre-existing timer before starting a new one. In + // particular, we want to ensure that if grpc_deadline_state_start() + // winds up trying to start the timer after grpc_deadline_state_reset() + // has already done so, we ignore the value from the former. + if (!deadline_state->timer_pending && + gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // Take a reference to the call stack, to be owned by the timer. GRPC_CALL_STACK_REF(deadline_state->call_stack, "deadline_timer"); - gpr_mu_lock(&deadline_state->timer_mu); deadline_state->timer_pending = true; grpc_timer_init(exec_ctx, &deadline_state->timer, deadline, timer_callback, elem, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(&deadline_state->timer_mu); } } +static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem, + gpr_timespec deadline) { + grpc_deadline_state* deadline_state = elem->call_data; + gpr_mu_lock(&deadline_state->timer_mu); + start_timer_if_needed_locked(exec_ctx, elem, deadline); + gpr_mu_unlock(&deadline_state->timer_mu); +} // Cancels the deadline timer. -static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { - gpr_mu_lock(&deadline_state->timer_mu); +static void cancel_timer_if_needed_locked(grpc_exec_ctx* exec_ctx, + grpc_deadline_state* deadline_state) { if (deadline_state->timer_pending) { grpc_timer_cancel(exec_ctx, &deadline_state->timer); deadline_state->timer_pending = false; } +} +static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, + grpc_deadline_state* deadline_state) { + gpr_mu_lock(&deadline_state->timer_mu); + cancel_timer_if_needed_locked(exec_ctx, deadline_state); gpr_mu_unlock(&deadline_state->timer_mu); } @@ -108,6 +127,21 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state, op->on_complete = &deadline_state->on_complete; } +void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_call_stack* call_stack) { + grpc_deadline_state* deadline_state = elem->call_data; + memset(deadline_state, 0, sizeof(*deadline_state)); + deadline_state->call_stack = call_stack; + gpr_mu_init(&deadline_state->timer_mu); +} + +void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem) { + grpc_deadline_state* deadline_state = elem->call_data; + cancel_timer_if_needed(exec_ctx, deadline_state); + gpr_mu_destroy(&deadline_state->timer_mu); +} + // Callback and associated state for starting the timer after call stack // initialization has been completed. struct start_timer_after_init_state { @@ -122,16 +156,11 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, gpr_free(state); } -void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args) { - grpc_deadline_state* deadline_state = elem->call_data; - memset(deadline_state, 0, sizeof(*deadline_state)); - deadline_state->call_stack = args->call_stack; - gpr_mu_init(&deadline_state->timer_mu); +void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec deadline) { // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. - const gpr_timespec deadline = - gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { // When the deadline passes, we indicate the failure by sending down // an op with cancel_error set. However, we can't send down any ops @@ -148,11 +177,13 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, } } -void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, - grpc_call_element* elem) { +void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec new_deadline) { grpc_deadline_state* deadline_state = elem->call_data; - cancel_timer_if_needed(exec_ctx, deadline_state); - gpr_mu_destroy(&deadline_state->timer_mu); + gpr_mu_lock(&deadline_state->timer_mu); + cancel_timer_if_needed_locked(exec_ctx, deadline_state); + start_timer_if_needed_locked(exec_ctx, elem, new_deadline); + gpr_mu_unlock(&deadline_state->timer_mu); } void grpc_deadline_state_client_start_transport_stream_op( @@ -209,7 +240,8 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element_args* args) { // Note: size of call data is different between client and server. memset(elem->call_data, 0, elem->filter->sizeof_call_data); - grpc_deadline_state_init(exec_ctx, elem, args); + grpc_deadline_state_init(exec_ctx, elem, args->call_stack); + grpc_deadline_state_start(exec_ctx, elem, args->deadline); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 685df87761..716a852565 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -54,18 +54,37 @@ typedef struct grpc_deadline_state { grpc_closure* next_on_complete; } grpc_deadline_state; -// To be used in a filter's init_call_elem(), destroy_call_elem(), and -// start_transport_stream_op() methods to enforce call deadlines. // -// REQUIRES: The first field in elem->call_data is a grpc_deadline_state. +// NOTE: All of these functions require that the first field in +// elem->call_data is a grpc_deadline_state. // -// For grpc_deadline_state_client_start_transport_stream_op(), it is the -// caller's responsibility to chain to the next filter if necessary -// after the function returns. + void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, - grpc_call_element_args* args); + grpc_call_stack* call_stack); void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem); + +// Starts the timer with the specified deadline. +// Should be called from the filter's init_call_elem() method. +void grpc_deadline_state_start(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec deadline); + +// Cancels the existing timer and starts a new one with new_deadline. +// +// Note: It is generally safe to call this with an earlier deadline +// value than the current one, but not the reverse. No checks are done +// to ensure that the timer callback is not invoked while it is in the +// process of being reset, which means that attempting to increase the +// deadline may result in the timer being called twice. +void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + gpr_timespec new_deadline); + +// To be called from the client-side filter's start_transport_stream_op() +// method. Ensures that the deadline timer is cancelled when the call +// is completed. +// +// Note: It is the caller's responsibility to chain to the next filter if +// necessary after this function returns. void grpc_deadline_state_client_start_transport_stream_op( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op); diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 8f9fb17a31..0d759887bc 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -183,7 +183,7 @@ void grpc_handshake_manager_do_handshake( gpr_timespec deadline, grpc_tcp_server_acceptor* acceptor, grpc_handshaker_done_cb cb, void* user_data) { grpc_channel_args* args_copy = grpc_channel_args_copy(args); - gpr_slice_buffer* read_buffer = malloc(sizeof(*read_buffer)); + gpr_slice_buffer* read_buffer = gpr_malloc(sizeof(*read_buffer)); gpr_slice_buffer_init(read_buffer); if (mgr->count == 0) { // No handshakers registered, so we just immediately call the done diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index f067a3a51c..1382f19945 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -38,6 +38,7 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/ext/client_config/method_config.h" #include "src/core/lib/channel/channel_args.h" #define DEFAULT_MAX_SEND_MESSAGE_LENGTH -1 // Unlimited. @@ -45,6 +46,8 @@ #define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024) typedef struct call_data { + int max_send_size; + int max_recv_size; // Receive closures are chained: we inject this closure as the // recv_message_ready up-call on transport_stream_op, and remember to // call our next_recv_message_ready member after handling it. @@ -58,6 +61,8 @@ typedef struct call_data { typedef struct channel_data { int max_send_size; int max_recv_size; + // Method config table. + grpc_method_config_table* method_config_table; } channel_data; // Callback invoked when we receive a message. Here we check the max @@ -66,13 +71,12 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_error* error) { grpc_call_element* elem = user_data; call_data* calld = elem->call_data; - channel_data* chand = elem->channel_data; - if (*calld->recv_message != NULL && chand->max_recv_size >= 0 && - (*calld->recv_message)->length > (size_t)chand->max_recv_size) { + if (*calld->recv_message != NULL && calld->max_recv_size >= 0 && + (*calld->recv_message)->length > (size_t)calld->max_recv_size) { char* message_string; gpr_asprintf(&message_string, "Received message larger than max (%u vs. %d)", - (*calld->recv_message)->length, chand->max_recv_size); + (*calld->recv_message)->length, calld->max_recv_size); grpc_error* new_error = grpc_error_set_int( GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INVALID_ARGUMENT); @@ -93,13 +97,12 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { call_data* calld = elem->call_data; - channel_data* chand = elem->channel_data; // Check max send message size. - if (op->send_message != NULL && chand->max_send_size >= 0 && - op->send_message->length > (size_t)chand->max_send_size) { + if (op->send_message != NULL && calld->max_send_size >= 0 && + op->send_message->length > (size_t)calld->max_send_size) { char* message_string; gpr_asprintf(&message_string, "Sent message larger than max (%u vs. %d)", - op->send_message->length, chand->max_send_size); + op->send_message->length, calld->max_send_size); gpr_slice message = gpr_slice_from_copied_string(message_string); gpr_free(message_string); grpc_call_element_send_close_with_message( @@ -119,9 +122,37 @@ static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_element_args* args) { + channel_data* chand = elem->channel_data; call_data* calld = elem->call_data; calld->next_recv_message_ready = NULL; grpc_closure_init(&calld->recv_message_ready, recv_message_ready, elem); + // Get max sizes from channel data, then merge in per-method config values. + // Note: Per-method config is only available on the client, so we + // apply the max request size to the send limit and the max response + // size to the receive limit. + calld->max_send_size = chand->max_send_size; + calld->max_recv_size = chand->max_recv_size; + if (chand->method_config_table != NULL) { + grpc_method_config* method_config = + grpc_method_config_table_get_method_config(chand->method_config_table, + args->path); + if (method_config != NULL) { + const int32_t* max_request_message_bytes = + grpc_method_config_get_max_request_message_bytes(method_config); + if (max_request_message_bytes != NULL && + (*max_request_message_bytes < calld->max_send_size || + calld->max_send_size < 0)) { + calld->max_send_size = *max_request_message_bytes; + } + const int32_t* max_response_message_bytes = + grpc_method_config_get_max_response_message_bytes(method_config); + if (max_response_message_bytes != NULL && + (*max_response_message_bytes < calld->max_recv_size || + calld->max_recv_size < 0)) { + calld->max_recv_size = *max_response_message_bytes; + } + } + } return GRPC_ERROR_NONE; } @@ -155,11 +186,22 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_arg_get_integer(&args->channel_args->args[i], options); } } + // Get method config table from channel args. + const grpc_arg* channel_arg = + grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); + if (channel_arg != NULL) { + GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); + chand->method_config_table = grpc_method_config_table_ref( + (grpc_method_config_table*)channel_arg->value.pointer.p); + } } // Destructor for channel_data. static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, - grpc_channel_element* elem) {} + grpc_channel_element* elem) { + channel_data* chand = elem->channel_data; + grpc_method_config_table_unref(chand->method_config_table); +} const grpc_channel_filter grpc_message_size_filter = { start_transport_stream_op, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 351b069613..27e966c18c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -985,8 +985,15 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (errno != EINTR) { work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } + for (i = 2; i < pfd_count; i++) { - fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); + if (watchers[i].fd == NULL) { + fd_end_poll(exec_ctx, &watchers[i], 0, 0, NULL); + } else { + // Wake up all the file descriptors, if we have an invalid one + // we can identify it on the next pollset_work() + fd_end_poll(exec_ctx, &watchers[i], 1, 1, pollset); + } } } else if (r == 0) { for (i = 2; i < pfd_count; i++) { diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index a825d2a28b..5a9a177963 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -49,11 +49,11 @@ typedef struct grpc_timer { } grpc_timer; /* Initialize *timer. When expired or canceled, timer_cb will be called with - *timer_cb_arg and status to indicate if it expired (SUCCESS) or was - canceled (CANCELLED). timer_cb is guaranteed to be called exactly once, - and application code should check the status to determine how it was - invoked. The application callback is also responsible for maintaining - information about when to free up any user-level state. */ + *timer_cb_arg and error set to indicate if it expired (GRPC_ERROR_NONE) or + was canceled (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called + exactly once, and application code should check the error to determine + how it was invoked. The application callback is also responsible for + maintaining information about when to free up any user-level state. */ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, gpr_timespec deadline, grpc_iomgr_cb_func timer_cb, void *timer_cb_arg, gpr_timespec now); @@ -74,8 +74,8 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, In all of these cases, the cancellation is still considered successful. They are essentially distinguished in that the timer_cb will be run - exactly once from either the cancellation (with status CANCELLED) - or from the activation (with status SUCCESS) + exactly once from either the cancellation (with error GRPC_ERROR_CANCELLED) + or from the activation (with error GRPC_ERROR_NONE). Note carefully that the callback function MAY occur in the same callstack as grpc_timer_cancel. It's expected that most timers will be cancelled (their @@ -83,14 +83,13 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, that cancellation costs as little as possible. Making callbacks run inline matches this aim. - Requires: cancel() must happen after add() on a given timer */ + Requires: cancel() must happen after init() on a given timer */ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); /* iomgr internal api for dealing with timers */ /* Check for timers to be run, and run them. Return true if timer callbacks were executed. - Drops drop_mu if it is non-null before executing callbacks. If next is non-null, TRY to update *next with the next running timer IF that timer occurs before *next current value. *next is never guaranteed to be updated on any given execution; however, diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index c4effa9a05..30559304c6 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -241,11 +241,15 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, /* Always support no compression */ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE); call->is_client = args->server_transport_data == NULL; + grpc_mdstr *path = NULL; if (call->is_client) { GPR_ASSERT(args->add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT); for (i = 0; i < args->add_initial_metadata_count; i++) { call->send_extra_metadata[i].md = args->add_initial_metadata[i]; + if (args->add_initial_metadata[i]->key == GRPC_MDSTR_PATH) { + path = GRPC_MDSTR_REF(args->add_initial_metadata[i]->value); + } } call->send_extra_metadata_count = (int)args->add_initial_metadata_count; } else { @@ -306,9 +310,10 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_destroy */ - grpc_error *error = grpc_call_stack_init( - &exec_ctx, channel_stack, 1, destroy_call, call, call->context, - args->server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call)); + grpc_error *error = + grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call, + call->context, args->server_transport_data, path, + send_deadline, CALL_STACK_FROM_CALL(call)); if (error != GRPC_ERROR_NONE) { grpc_status_code status; const char *error_str; @@ -332,6 +337,8 @@ grpc_error *grpc_call_create(const grpc_call_create_args *args, &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); } + if (path != NULL) GRPC_MDSTR_UNREF(path); + grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_create", 0); return error; diff --git a/src/core/lib/transport/mdstr_hash_table.c b/src/core/lib/transport/mdstr_hash_table.c new file mode 100644 index 0000000000..4be0536dd7 --- /dev/null +++ b/src/core/lib/transport/mdstr_hash_table.c @@ -0,0 +1,142 @@ +// +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +#include "src/core/lib/transport/mdstr_hash_table.h" + +#include <stdbool.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/transport/metadata.h" + +struct grpc_mdstr_hash_table { + gpr_refcount refs; + size_t num_entries; + grpc_mdstr_hash_table_entry* entries; +}; + +// Helper function for insert and get operations that performs quadratic +// probing (https://en.wikipedia.org/wiki/Quadratic_probing). +static size_t grpc_mdstr_hash_table_find_index( + const grpc_mdstr_hash_table* table, const grpc_mdstr* key, + bool find_empty) { + for (size_t i = 0; i < table->num_entries; ++i) { + const size_t idx = (key->hash + i * i) % table->num_entries; + if (table->entries[idx].key == NULL) + return find_empty ? idx : table->num_entries; + if (table->entries[idx].key == key) return idx; + } + return table->num_entries; // Not found. +} + +static void grpc_mdstr_hash_table_add( + grpc_mdstr_hash_table* table, grpc_mdstr* key, void* value, + const grpc_mdstr_hash_table_vtable* vtable) { + GPR_ASSERT(value != NULL); + const size_t idx = + grpc_mdstr_hash_table_find_index(table, key, true /* find_empty */); + GPR_ASSERT(idx != table->num_entries); // Table should never be full. + grpc_mdstr_hash_table_entry* entry = &table->entries[idx]; + entry->key = GRPC_MDSTR_REF(key); + entry->value = vtable->copy_value(value); + entry->vtable = vtable; +} + +grpc_mdstr_hash_table* grpc_mdstr_hash_table_create( + size_t num_entries, grpc_mdstr_hash_table_entry* entries) { + grpc_mdstr_hash_table* table = gpr_malloc(sizeof(*table)); + memset(table, 0, sizeof(*table)); + gpr_ref_init(&table->refs, 1); + // Quadratic probing gets best performance when the table is no more + // than half full. + table->num_entries = num_entries * 2; + const size_t entry_size = + sizeof(grpc_mdstr_hash_table_entry) * table->num_entries; + table->entries = gpr_malloc(entry_size); + memset(table->entries, 0, entry_size); + for (size_t i = 0; i < num_entries; ++i) { + grpc_mdstr_hash_table_entry* entry = &entries[i]; + grpc_mdstr_hash_table_add(table, entry->key, entry->value, entry->vtable); + } + return table; +} + +grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table) { + if (table != NULL) gpr_ref(&table->refs); + return table; +} + +int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) { + if (table != NULL && gpr_unref(&table->refs)) { + for (size_t i = 0; i < table->num_entries; ++i) { + grpc_mdstr_hash_table_entry* entry = &table->entries[i]; + if (entry->key != NULL) { + GRPC_MDSTR_UNREF(entry->key); + entry->vtable->destroy_value(entry->value); + } + } + gpr_free(table->entries); + gpr_free(table); + return 1; + } + return 0; +} + +void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table, + const grpc_mdstr* key) { + const size_t idx = + grpc_mdstr_hash_table_find_index(table, key, false /* find_empty */); + if (idx == table->num_entries) return NULL; // Not found. + return table->entries[idx].value; +} + +int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1, + const grpc_mdstr_hash_table* table2) { + // Compare by num_entries. + if (table1->num_entries < table2->num_entries) return -1; + if (table1->num_entries > table2->num_entries) return 1; + for (size_t i = 0; i < table1->num_entries; ++i) { + grpc_mdstr_hash_table_entry* e1 = &table1->entries[i]; + grpc_mdstr_hash_table_entry* e2 = &table2->entries[i]; + // Compare keys by hash value. + if (e1->key->hash < e2->key->hash) return -1; + if (e1->key->hash > e2->key->hash) return 1; + // Compare by vtable (pointer equality). + if (e1->vtable < e2->vtable) return -1; + if (e1->vtable > e2->vtable) return 1; + // Compare values via vtable. + const int value_result = e1->vtable->compare_value(e1->value, e2->value); + if (value_result != 0) return value_result; + } + return 0; +} diff --git a/src/core/lib/transport/mdstr_hash_table.h b/src/core/lib/transport/mdstr_hash_table.h new file mode 100644 index 0000000000..52e5b023db --- /dev/null +++ b/src/core/lib/transport/mdstr_hash_table.h @@ -0,0 +1,83 @@ +/* + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef GRPC_CORE_LIB_TRANSPORT_MDSTR_HASH_TABLE_H +#define GRPC_CORE_LIB_TRANSPORT_MDSTR_HASH_TABLE_H + +#include "src/core/lib/transport/metadata.h" + +/** Hash table implementation. + * + * This implementation uses open addressing + * (https://en.wikipedia.org/wiki/Open_addressing) with quadratic + * probing (https://en.wikipedia.org/wiki/Quadratic_probing). + * + * The keys are \a grpc_mdstr objects. The values are arbitrary pointers + * with a common vtable. + * + * Hash tables are intentionally immutable, to avoid the need for locking. + */ + +typedef struct grpc_mdstr_hash_table grpc_mdstr_hash_table; + +typedef struct grpc_mdstr_hash_table_vtable { + void (*destroy_value)(void* value); + void* (*copy_value)(void* value); + int (*compare_value)(void* value1, void* value2); +} grpc_mdstr_hash_table_vtable; + +typedef struct grpc_mdstr_hash_table_entry { + grpc_mdstr* key; + void* value; /* Must not be NULL. */ + const grpc_mdstr_hash_table_vtable* vtable; +} grpc_mdstr_hash_table_entry; + +/** Creates a new hash table of containing \a entries, which is an array + of length \a num_entries. + Creates its own copy of all keys and values from \a entries. */ +grpc_mdstr_hash_table* grpc_mdstr_hash_table_create( + size_t num_entries, grpc_mdstr_hash_table_entry* entries); + +grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table); +/** Returns 1 when \a table is destroyed. */ +int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table); + +/** Returns the value from \a table associated with \a key. + Returns NULL if \a key is not found. */ +void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table, + const grpc_mdstr* key); + +/** Compares two hash tables. + The sort order is stable but undefined. */ +int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1, + const grpc_mdstr_hash_table* table2); + +#endif /* GRPC_CORE_LIB_TRANSPORT_MDSTR_HASH_TABLE_H */ |