diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 141 | ||||
-rw-r--r-- | src/core/ext/client_channel/method_config.c | 56 | ||||
-rw-r--r-- | src/core/ext/client_channel/method_config.h | 24 | ||||
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 74 | ||||
-rw-r--r-- | src/core/lib/transport/mdstr_hash_table.c | 37 | ||||
-rw-r--r-- | src/core/lib/transport/mdstr_hash_table.h | 9 | ||||
-rw-r--r-- | src/csharp/Grpc.Core/Server.cs | 3 | ||||
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/InteropClient.cs | 16 | ||||
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs | 8 | ||||
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/Test.cs | 37 | ||||
-rw-r--r-- | src/csharp/Grpc.IntegrationTesting/TestGrpc.cs | 105 | ||||
-rw-r--r-- | src/node/interop/interop_client.js | 16 | ||||
-rw-r--r-- | src/node/test/interop_sanity_test.js | 4 |
13 files changed, 415 insertions, 115 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index dfa84b0d77..9399b4f1a5 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -61,6 +61,54 @@ /* Client channel implementation */ /************************************************************************* + * METHOD-CONFIG TABLE + */ + +typedef enum { + WAIT_FOR_READY_UNSET, + WAIT_FOR_READY_FALSE, + WAIT_FOR_READY_TRUE +} wait_for_ready_value; + +typedef struct method_parameters { + gpr_timespec timeout; + wait_for_ready_value wait_for_ready; +} method_parameters; + +static void *method_parameters_copy(void *value) { + void *new_value = gpr_malloc(sizeof(method_parameters)); + memcpy(new_value, value, sizeof(method_parameters)); + return new_value; +} + +static int method_parameters_cmp(void *value1, void *value2) { + const method_parameters *v1 = value1; + const method_parameters *v2 = value2; + const int retval = gpr_time_cmp(v1->timeout, v2->timeout); + if (retval != 0) return retval; + if (v1->wait_for_ready > v2->wait_for_ready) return 1; + if (v1->wait_for_ready < v2->wait_for_ready) return -1; + return 0; +} + +static const grpc_mdstr_hash_table_vtable method_parameters_vtable = { + gpr_free, method_parameters_copy, method_parameters_cmp}; + +static void *method_config_convert_value( + const grpc_method_config *method_config) { + method_parameters *value = gpr_malloc(sizeof(method_parameters)); + const gpr_timespec *timeout = grpc_method_config_get_timeout(method_config); + value->timeout = timeout != NULL ? *timeout : gpr_time_0(GPR_TIMESPAN); + const bool *wait_for_ready = + grpc_method_config_get_wait_for_ready(method_config); + value->wait_for_ready = + wait_for_ready == NULL + ? WAIT_FOR_READY_UNSET + : (wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE); + return value; +} + +/************************************************************************* * CHANNEL-WIDE FUNCTIONS */ @@ -76,8 +124,8 @@ typedef struct client_channel_channel_data { gpr_mu mu; /** currently active load balancer */ grpc_lb_policy *lb_policy; - /** method config table */ - grpc_method_config_table *method_config_table; + /** maps method names to method_parameters structs */ + grpc_mdstr_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ grpc_channel_args *resolver_result; /** a list of closures that are all waiting for config to come in */ @@ -177,7 +225,7 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; - grpc_method_config_table *method_config_table = NULL; + grpc_mdstr_hash_table *method_params_table = NULL; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; bool exit_idle = false; grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy"); @@ -237,8 +285,9 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_SERVICE_CONFIG); if (channel_arg != NULL) { GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); - method_config_table = grpc_method_config_table_ref( - (grpc_method_config_table *)channel_arg->value.pointer.p); + method_params_table = grpc_method_config_table_convert( + (grpc_method_config_table *)channel_arg->value.pointer.p, + method_config_convert_value, &method_parameters_vtable); } grpc_channel_args_destroy(chand->resolver_result); chand->resolver_result = NULL; @@ -252,10 +301,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&chand->mu); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; - if (chand->method_config_table != NULL) { - grpc_method_config_table_unref(chand->method_config_table); + if (chand->method_params_table != NULL) { + grpc_mdstr_hash_table_unref(chand->method_params_table); } - chand->method_config_table = method_config_table; + chand->method_params_table = method_params_table; if (lb_policy != NULL) { grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, NULL); @@ -416,8 +465,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } - if (chand->method_config_table != NULL) { - grpc_method_config_table_unref(chand->method_config_table); + if (chand->method_params_table != NULL) { + grpc_mdstr_hash_table_unref(chand->method_params_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); @@ -455,11 +504,7 @@ typedef struct client_channel_call_data { grpc_mdstr *path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; - enum { - WAIT_FOR_READY_UNSET, - WAIT_FOR_READY_FALSE, - WAIT_FOR_READY_TRUE - } wait_for_ready_from_service_config; + wait_for_ready_value wait_for_ready_from_service_config; grpc_closure read_service_config; grpc_error *cancel_error; @@ -853,41 +898,39 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, if (error == GRPC_ERROR_NONE) { // Get the method config table from channel data. gpr_mu_lock(&chand->mu); - grpc_method_config_table *method_config_table = NULL; - if (chand->method_config_table != NULL) { - method_config_table = - grpc_method_config_table_ref(chand->method_config_table); + grpc_mdstr_hash_table *method_params_table = NULL; + if (chand->method_params_table != NULL) { + method_params_table = + grpc_mdstr_hash_table_ref(chand->method_params_table); } gpr_mu_unlock(&chand->mu); // If the method config table was present, use it. - if (method_config_table != NULL) { - const grpc_method_config *method_config = - grpc_method_config_table_get_method_config(method_config_table, - calld->path); - if (method_config != NULL) { - const gpr_timespec *per_method_timeout = - grpc_method_config_get_timeout(method_config); - const bool *wait_for_ready = - grpc_method_config_get_wait_for_ready(method_config); - if (per_method_timeout != NULL || wait_for_ready != NULL) { + if (method_params_table != NULL) { + const method_parameters *method_params = + grpc_method_config_table_get(method_params_table, calld->path); + if (method_params != NULL) { + const bool have_method_timeout = + gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; + if (have_method_timeout || + method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { gpr_mu_lock(&calld->mu); - if (per_method_timeout != NULL) { - gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, *per_method_timeout); + if (have_method_timeout) { + const gpr_timespec per_method_deadline = + gpr_time_add(calld->call_start_time, method_params->timeout); if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { calld->deadline = per_method_deadline; // Reset deadline timer. grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); } } - if (wait_for_ready != NULL) { + if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { calld->wait_for_ready_from_service_config = - *wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE; + method_params->wait_for_ready; } gpr_mu_unlock(&calld->mu); } } - grpc_method_config_table_unref(method_config_table); + grpc_mdstr_hash_table_unref(method_params_table); } } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); @@ -924,29 +967,25 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&chand->mu); if (chand->lb_policy != NULL) { // We already have a resolver result, so check for service config. - if (chand->method_config_table != NULL) { - grpc_method_config_table *method_config_table = - grpc_method_config_table_ref(chand->method_config_table); + if (chand->method_params_table != NULL) { + grpc_mdstr_hash_table *method_params_table = + grpc_mdstr_hash_table_ref(chand->method_params_table); gpr_mu_unlock(&chand->mu); - grpc_method_config *method_config = - grpc_method_config_table_get_method_config(method_config_table, - args->path); - if (method_config != NULL) { - const gpr_timespec *per_method_timeout = - grpc_method_config_get_timeout(method_config); - if (per_method_timeout != NULL) { + method_parameters *method_params = + grpc_method_config_table_get(method_params_table, args->path); + if (method_params != NULL) { + if (gpr_time_cmp(method_params->timeout, + gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, *per_method_timeout); + gpr_time_add(calld->call_start_time, method_params->timeout); calld->deadline = gpr_time_min(calld->deadline, per_method_deadline); } - const bool *wait_for_ready = - grpc_method_config_get_wait_for_ready(method_config); - if (wait_for_ready != NULL) { + if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { calld->wait_for_ready_from_service_config = - *wait_for_ready ? WAIT_FOR_READY_TRUE : WAIT_FOR_READY_FALSE; + method_params->wait_for_ready; } } - grpc_method_config_table_unref(method_config_table); + grpc_mdstr_hash_table_unref(method_params_table); } else { gpr_mu_unlock(&chand->mu); } diff --git a/src/core/ext/client_channel/method_config.c b/src/core/ext/client_channel/method_config.c index 49d1ad344e..4313ad5e0e 100644 --- a/src/core/ext/client_channel/method_config.c +++ b/src/core/ext/client_channel/method_config.c @@ -254,12 +254,12 @@ int grpc_method_config_table_cmp(const grpc_method_config_table* table1, return grpc_mdstr_hash_table_cmp(table1, table2); } -grpc_method_config* grpc_method_config_table_get_method_config( - const grpc_method_config_table* table, const grpc_mdstr* path) { - grpc_method_config* method_config = grpc_mdstr_hash_table_get(table, path); +void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table, + const grpc_mdstr* path) { + void* value = grpc_mdstr_hash_table_get(table, path); // If we didn't find a match for the path, try looking for a wildcard // entry (i.e., change "/service/method" to "/service/*"). - if (method_config == NULL) { + if (value == NULL) { const char* path_str = grpc_mdstr_as_c_string(path); const char* sep = strrchr(path_str, '/') + 1; const size_t len = (size_t)(sep - path_str); @@ -269,10 +269,10 @@ grpc_method_config* grpc_method_config_table_get_method_config( buf[len + 1] = '\0'; grpc_mdstr* wildcard_path = grpc_mdstr_from_string(buf); gpr_free(buf); - method_config = grpc_mdstr_hash_table_get(table, wildcard_path); + value = grpc_mdstr_hash_table_get(table, wildcard_path); GRPC_MDSTR_UNREF(wildcard_path); } - return method_config; + return value; } static void* copy_arg(void* p) { return grpc_method_config_table_ref(p); } @@ -294,3 +294,47 @@ grpc_arg grpc_method_config_table_create_channel_arg( arg.value.pointer.vtable = &arg_vtable; return arg; } + +// State used by convert_entry() below. +typedef struct conversion_state { + void* (*convert_value)(const grpc_method_config* method_config); + const grpc_mdstr_hash_table_vtable* vtable; + size_t num_entries; + grpc_mdstr_hash_table_entry* entries; +} conversion_state; + +// A function to be passed to grpc_mdstr_hash_table_iterate() to create +// a copy of the entries. +static void convert_entry(const grpc_mdstr_hash_table_entry* entry, + void* user_data) { + conversion_state* state = user_data; + state->entries[state->num_entries].key = GRPC_MDSTR_REF(entry->key); + state->entries[state->num_entries].value = state->convert_value(entry->value); + state->entries[state->num_entries].vtable = state->vtable; + ++state->num_entries; +} + +grpc_mdstr_hash_table* grpc_method_config_table_convert( + const grpc_method_config_table* table, + void* (*convert_value)(const grpc_method_config* method_config), + const grpc_mdstr_hash_table_vtable* vtable) { + // Create an array of the entries in the table with converted values. + conversion_state state; + state.convert_value = convert_value; + state.vtable = vtable; + state.num_entries = 0; + state.entries = gpr_malloc(sizeof(grpc_mdstr_hash_table_entry) * + grpc_mdstr_hash_table_num_entries(table)); + grpc_mdstr_hash_table_iterate(table, convert_entry, &state); + // Create a new table based on the array we just constructed. + grpc_mdstr_hash_table* new_table = + grpc_mdstr_hash_table_create(state.num_entries, state.entries); + // Clean up the array. + for (size_t i = 0; i < state.num_entries; ++i) { + GRPC_MDSTR_UNREF(state.entries[i].key); + vtable->destroy_value(state.entries[i].value); + } + gpr_free(state.entries); + // Return the new table. + return new_table; +} diff --git a/src/core/ext/client_channel/method_config.h b/src/core/ext/client_channel/method_config.h index 75b32fcb17..4cbeee5625 100644 --- a/src/core/ext/client_channel/method_config.h +++ b/src/core/ext/client_channel/method_config.h @@ -106,11 +106,31 @@ int grpc_method_config_table_cmp(const grpc_method_config_table* table1, /// the form "/service/method". /// Returns NULL if the method has no config. /// Caller does NOT own a reference to the result. -grpc_method_config* grpc_method_config_table_get_method_config( - const grpc_method_config_table* table, const grpc_mdstr* path); +/// +/// Note: This returns a void* instead of a grpc_method_config* so that +/// it can also be used for tables constructed via +/// grpc_method_config_table_convert(). +void* grpc_method_config_table_get(const grpc_mdstr_hash_table* table, + const grpc_mdstr* path); /// Returns a channel arg containing \a table. grpc_arg grpc_method_config_table_create_channel_arg( grpc_method_config_table* table); +/// Generates a new table from \a table whose values are converted to a +/// new form via the \a convert_value function. The new table will use +/// \a vtable for its values. +/// +/// This is generally used to convert the table's value type from +/// grpc_method_config to a simple struct containing only the parameters +/// relevant to a particular filter, thus avoiding the need for a hash +/// table lookup on the fast path. In that scenario, \a convert_value +/// will return a new instance of the struct containing the values from +/// the grpc_method_config, and \a vtable provides the methods for +/// operating on the struct type. +grpc_mdstr_hash_table* grpc_method_config_table_convert( + const grpc_method_config_table* table, + void* (*convert_value)(const grpc_method_config* method_config), + const grpc_mdstr_hash_table_vtable* vtable); + #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_METHOD_CONFIG_H */ diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 69fb3819ff..b8b2546035 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -45,6 +45,44 @@ // The protobuf library will (by default) start warning at 100 megs. #define DEFAULT_MAX_RECV_MESSAGE_LENGTH (4 * 1024 * 1024) +typedef struct message_size_limits { + int max_send_size; + int max_recv_size; +} message_size_limits; + +static void* message_size_limits_copy(void* value) { + void* new_value = gpr_malloc(sizeof(message_size_limits)); + memcpy(new_value, value, sizeof(message_size_limits)); + return new_value; +} + +static int message_size_limits_cmp(void* value1, void* value2) { + const message_size_limits* v1 = value1; + const message_size_limits* v2 = value2; + if (v1->max_send_size > v2->max_send_size) return 1; + if (v1->max_send_size < v2->max_send_size) return -1; + if (v1->max_recv_size > v2->max_recv_size) return 1; + if (v1->max_recv_size < v2->max_recv_size) return -1; + return 0; +} + +static const grpc_mdstr_hash_table_vtable message_size_limits_vtable = { + gpr_free, message_size_limits_copy, message_size_limits_cmp}; + +static void* method_config_convert_value( + const grpc_method_config* method_config) { + message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); + const int32_t* max_request_message_bytes = + grpc_method_config_get_max_request_message_bytes(method_config); + value->max_send_size = + max_request_message_bytes != NULL ? *max_request_message_bytes : -1; + const int32_t* max_response_message_bytes = + grpc_method_config_get_max_response_message_bytes(method_config); + value->max_recv_size = + max_response_message_bytes != NULL ? *max_response_message_bytes : -1; + return value; +} + typedef struct call_data { int max_send_size; int max_recv_size; @@ -61,8 +99,8 @@ typedef struct call_data { typedef struct channel_data { int max_send_size; int max_recv_size; - // Method config table. - grpc_method_config_table* method_config_table; + // Maps path names to message_size_limits structs. + grpc_mdstr_hash_table* method_limit_table; } channel_data; // Callback invoked when we receive a message. Here we check the max @@ -132,24 +170,19 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // size to the receive limit. calld->max_send_size = chand->max_send_size; calld->max_recv_size = chand->max_recv_size; - if (chand->method_config_table != NULL) { - grpc_method_config* method_config = - grpc_method_config_table_get_method_config(chand->method_config_table, - args->path); - if (method_config != NULL) { - const int32_t* max_request_message_bytes = - grpc_method_config_get_max_request_message_bytes(method_config); - if (max_request_message_bytes != NULL && - (*max_request_message_bytes < calld->max_send_size || + if (chand->method_limit_table != NULL) { + message_size_limits* limits = + grpc_method_config_table_get(chand->method_limit_table, args->path); + if (limits != NULL) { + if (limits->max_send_size >= 0 && + (limits->max_send_size < calld->max_send_size || calld->max_send_size < 0)) { - calld->max_send_size = *max_request_message_bytes; + calld->max_send_size = limits->max_send_size; } - const int32_t* max_response_message_bytes = - grpc_method_config_get_max_response_message_bytes(method_config); - if (max_response_message_bytes != NULL && - (*max_response_message_bytes < calld->max_recv_size || + if (limits->max_recv_size >= 0 && + (limits->max_recv_size < calld->max_recv_size || calld->max_recv_size < 0)) { - calld->max_recv_size = *max_response_message_bytes; + calld->max_recv_size = limits->max_recv_size; } } } @@ -191,8 +224,9 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG); if (channel_arg != NULL) { GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); - chand->method_config_table = grpc_method_config_table_ref( - (grpc_method_config_table*)channel_arg->value.pointer.p); + chand->method_limit_table = grpc_method_config_table_convert( + (grpc_method_config_table*)channel_arg->value.pointer.p, + method_config_convert_value, &message_size_limits_vtable); } } @@ -200,7 +234,7 @@ static void init_channel_elem(grpc_exec_ctx* exec_ctx, static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem) { channel_data* chand = elem->channel_data; - grpc_method_config_table_unref(chand->method_config_table); + grpc_mdstr_hash_table_unref(chand->method_limit_table); } const grpc_channel_filter grpc_message_size_filter = { diff --git a/src/core/lib/transport/mdstr_hash_table.c b/src/core/lib/transport/mdstr_hash_table.c index 4be0536dd7..8e914c420b 100644 --- a/src/core/lib/transport/mdstr_hash_table.c +++ b/src/core/lib/transport/mdstr_hash_table.c @@ -42,6 +42,7 @@ struct grpc_mdstr_hash_table { gpr_refcount refs; size_t num_entries; + size_t size; grpc_mdstr_hash_table_entry* entries; }; @@ -50,13 +51,12 @@ struct grpc_mdstr_hash_table { static size_t grpc_mdstr_hash_table_find_index( const grpc_mdstr_hash_table* table, const grpc_mdstr* key, bool find_empty) { - for (size_t i = 0; i < table->num_entries; ++i) { - const size_t idx = (key->hash + i * i) % table->num_entries; - if (table->entries[idx].key == NULL) - return find_empty ? idx : table->num_entries; + for (size_t i = 0; i < table->size; ++i) { + const size_t idx = (key->hash + i * i) % table->size; + if (table->entries[idx].key == NULL) return find_empty ? idx : table->size; if (table->entries[idx].key == key) return idx; } - return table->num_entries; // Not found. + return table->size; // Not found. } static void grpc_mdstr_hash_table_add( @@ -65,7 +65,7 @@ static void grpc_mdstr_hash_table_add( GPR_ASSERT(value != NULL); const size_t idx = grpc_mdstr_hash_table_find_index(table, key, true /* find_empty */); - GPR_ASSERT(idx != table->num_entries); // Table should never be full. + GPR_ASSERT(idx != table->size); // Table should never be full. grpc_mdstr_hash_table_entry* entry = &table->entries[idx]; entry->key = GRPC_MDSTR_REF(key); entry->value = vtable->copy_value(value); @@ -77,11 +77,11 @@ grpc_mdstr_hash_table* grpc_mdstr_hash_table_create( grpc_mdstr_hash_table* table = gpr_malloc(sizeof(*table)); memset(table, 0, sizeof(*table)); gpr_ref_init(&table->refs, 1); + table->num_entries = num_entries; // Quadratic probing gets best performance when the table is no more // than half full. - table->num_entries = num_entries * 2; - const size_t entry_size = - sizeof(grpc_mdstr_hash_table_entry) * table->num_entries; + table->size = num_entries * 2; + const size_t entry_size = sizeof(grpc_mdstr_hash_table_entry) * table->size; table->entries = gpr_malloc(entry_size); memset(table->entries, 0, entry_size); for (size_t i = 0; i < num_entries; ++i) { @@ -98,7 +98,7 @@ grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table) { int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) { if (table != NULL && gpr_unref(&table->refs)) { - for (size_t i = 0; i < table->num_entries; ++i) { + for (size_t i = 0; i < table->size; ++i) { grpc_mdstr_hash_table_entry* entry = &table->entries[i]; if (entry->key != NULL) { GRPC_MDSTR_UNREF(entry->key); @@ -112,11 +112,15 @@ int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table) { return 0; } +size_t grpc_mdstr_hash_table_num_entries(const grpc_mdstr_hash_table* table) { + return table->num_entries; +} + void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table, const grpc_mdstr* key) { const size_t idx = grpc_mdstr_hash_table_find_index(table, key, false /* find_empty */); - if (idx == table->num_entries) return NULL; // Not found. + if (idx == table->size) return NULL; // Not found. return table->entries[idx].value; } @@ -140,3 +144,14 @@ int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1, } return 0; } + +void grpc_mdstr_hash_table_iterate( + const grpc_mdstr_hash_table* table, + void (*func)(const grpc_mdstr_hash_table_entry* entry, void* user_data), + void* user_data) { + for (size_t i = 0; i < table->size; ++i) { + if (table->entries[i].key != NULL) { + func(&table->entries[i], user_data); + } + } +} diff --git a/src/core/lib/transport/mdstr_hash_table.h b/src/core/lib/transport/mdstr_hash_table.h index 52e5b023db..bceb4df93d 100644 --- a/src/core/lib/transport/mdstr_hash_table.h +++ b/src/core/lib/transport/mdstr_hash_table.h @@ -70,6 +70,9 @@ grpc_mdstr_hash_table* grpc_mdstr_hash_table_ref(grpc_mdstr_hash_table* table); /** Returns 1 when \a table is destroyed. */ int grpc_mdstr_hash_table_unref(grpc_mdstr_hash_table* table); +/** Returns the number of entries in \a table. */ +size_t grpc_mdstr_hash_table_num_entries(const grpc_mdstr_hash_table* table); + /** Returns the value from \a table associated with \a key. Returns NULL if \a key is not found. */ void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table, @@ -80,4 +83,10 @@ void* grpc_mdstr_hash_table_get(const grpc_mdstr_hash_table* table, int grpc_mdstr_hash_table_cmp(const grpc_mdstr_hash_table* table1, const grpc_mdstr_hash_table* table2); +/** Iterates over the entries in \a table, calling \a func for each entry. */ +void grpc_mdstr_hash_table_iterate( + const grpc_mdstr_hash_table* table, + void (*func)(const grpc_mdstr_hash_table_entry* entry, void* user_data), + void* user_data); + #endif /* GRPC_CORE_LIB_TRANSPORT_MDSTR_HASH_TABLE_H */ diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs index 961b911fbb..63c1d9cd00 100644 --- a/src/csharp/Grpc.Core/Server.cs +++ b/src/csharp/Grpc.Core/Server.cs @@ -344,8 +344,7 @@ namespace Grpc.Core { Logger.Warning(e, "Exception while handling RPC."); } - - if (continuation != null) + finally { continuation(); } diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs index 7a46a55a5b..5ba83b143e 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs @@ -195,8 +195,11 @@ namespace Grpc.IntegrationTesting case "status_code_and_message": await RunStatusCodeAndMessageAsync(client); break; + case "unimplemented_service": + RunUnimplementedService(new UnimplementedService.UnimplementedServiceClient(channel)); + break; case "unimplemented_method": - RunUnimplementedMethod(new UnimplementedService.UnimplementedServiceClient(channel)); + RunUnimplementedMethod(client); break; case "client_compressed_unary": RunClientCompressedUnary(client); @@ -577,7 +580,16 @@ namespace Grpc.IntegrationTesting Console.WriteLine("Passed!"); } - public static void RunUnimplementedMethod(UnimplementedService.UnimplementedServiceClient client) + public static void RunUnimplementedService(UnimplementedService.UnimplementedServiceClient client) + { + Console.WriteLine("running unimplemented_service"); + var e = Assert.Throws<RpcException>(() => client.UnimplementedCall(new Empty())); + + Assert.AreEqual(StatusCode.Unimplemented, e.Status.StatusCode); + Console.WriteLine("Passed!"); + } + + public static void RunUnimplementedMethod(TestService.TestServiceClient client) { Console.WriteLine("running unimplemented_method"); var e = Assert.Throws<RpcException>(() => client.UnimplementedCall(new Empty())); diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs index f907f630da..4960a53f92 100644 --- a/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/InteropClientServerTest.cs @@ -146,9 +146,15 @@ namespace Grpc.IntegrationTesting } [Test] + public void UnimplementedService() + { + InteropClient.RunUnimplementedService(new UnimplementedService.UnimplementedServiceClient(channel)); + } + + [Test] public void UnimplementedMethod() { - InteropClient.RunUnimplementedMethod(new UnimplementedService.UnimplementedServiceClient(channel)); + InteropClient.RunUnimplementedMethod(client); } } } diff --git a/src/csharp/Grpc.IntegrationTesting/Test.cs b/src/csharp/Grpc.IntegrationTesting/Test.cs index 88c2b8a921..d2fa9f8013 100644 --- a/src/csharp/Grpc.IntegrationTesting/Test.cs +++ b/src/csharp/Grpc.IntegrationTesting/Test.cs @@ -24,25 +24,28 @@ namespace Grpc.Testing { string.Concat( "CiFzcmMvcHJvdG8vZ3JwYy90ZXN0aW5nL3Rlc3QucHJvdG8SDGdycGMudGVz", "dGluZxoic3JjL3Byb3RvL2dycGMvdGVzdGluZy9lbXB0eS5wcm90bxolc3Jj", - "L3Byb3RvL2dycGMvdGVzdGluZy9tZXNzYWdlcy5wcm90bzK7BAoLVGVzdFNl", + "L3Byb3RvL2dycGMvdGVzdGluZy9tZXNzYWdlcy5wcm90bzLLBQoLVGVzdFNl", "cnZpY2USNQoJRW1wdHlDYWxsEhMuZ3JwYy50ZXN0aW5nLkVtcHR5GhMuZ3Jw", "Yy50ZXN0aW5nLkVtcHR5EkYKCVVuYXJ5Q2FsbBIbLmdycGMudGVzdGluZy5T", - "aW1wbGVSZXF1ZXN0GhwuZ3JwYy50ZXN0aW5nLlNpbXBsZVJlc3BvbnNlEmwK", - "E1N0cmVhbWluZ091dHB1dENhbGwSKC5ncnBjLnRlc3RpbmcuU3RyZWFtaW5n", - "T3V0cHV0Q2FsbFJlcXVlc3QaKS5ncnBjLnRlc3RpbmcuU3RyZWFtaW5nT3V0", - "cHV0Q2FsbFJlc3BvbnNlMAESaQoSU3RyZWFtaW5nSW5wdXRDYWxsEicuZ3Jw", - "Yy50ZXN0aW5nLlN0cmVhbWluZ0lucHV0Q2FsbFJlcXVlc3QaKC5ncnBjLnRl", - "c3RpbmcuU3RyZWFtaW5nSW5wdXRDYWxsUmVzcG9uc2UoARJpCg5GdWxsRHVw", - "bGV4Q2FsbBIoLmdycGMudGVzdGluZy5TdHJlYW1pbmdPdXRwdXRDYWxsUmVx", - "dWVzdBopLmdycGMudGVzdGluZy5TdHJlYW1pbmdPdXRwdXRDYWxsUmVzcG9u", - "c2UoATABEmkKDkhhbGZEdXBsZXhDYWxsEiguZ3JwYy50ZXN0aW5nLlN0cmVh", - "bWluZ091dHB1dENhbGxSZXF1ZXN0GikuZ3JwYy50ZXN0aW5nLlN0cmVhbWlu", - "Z091dHB1dENhbGxSZXNwb25zZSgBMAEyVQoUVW5pbXBsZW1lbnRlZFNlcnZp", - "Y2USPQoRVW5pbXBsZW1lbnRlZENhbGwSEy5ncnBjLnRlc3RpbmcuRW1wdHka", - "Ey5ncnBjLnRlc3RpbmcuRW1wdHkyiQEKEFJlY29ubmVjdFNlcnZpY2USOwoF", - "U3RhcnQSHS5ncnBjLnRlc3RpbmcuUmVjb25uZWN0UGFyYW1zGhMuZ3JwYy50", - "ZXN0aW5nLkVtcHR5EjgKBFN0b3ASEy5ncnBjLnRlc3RpbmcuRW1wdHkaGy5n", - "cnBjLnRlc3RpbmcuUmVjb25uZWN0SW5mb2IGcHJvdG8z")); + "aW1wbGVSZXF1ZXN0GhwuZ3JwYy50ZXN0aW5nLlNpbXBsZVJlc3BvbnNlEk8K", + "EkNhY2hlYWJsZVVuYXJ5Q2FsbBIbLmdycGMudGVzdGluZy5TaW1wbGVSZXF1", + "ZXN0GhwuZ3JwYy50ZXN0aW5nLlNpbXBsZVJlc3BvbnNlEmwKE1N0cmVhbWlu", + "Z091dHB1dENhbGwSKC5ncnBjLnRlc3RpbmcuU3RyZWFtaW5nT3V0cHV0Q2Fs", + "bFJlcXVlc3QaKS5ncnBjLnRlc3RpbmcuU3RyZWFtaW5nT3V0cHV0Q2FsbFJl", + "c3BvbnNlMAESaQoSU3RyZWFtaW5nSW5wdXRDYWxsEicuZ3JwYy50ZXN0aW5n", + "LlN0cmVhbWluZ0lucHV0Q2FsbFJlcXVlc3QaKC5ncnBjLnRlc3RpbmcuU3Ry", + "ZWFtaW5nSW5wdXRDYWxsUmVzcG9uc2UoARJpCg5GdWxsRHVwbGV4Q2FsbBIo", + "LmdycGMudGVzdGluZy5TdHJlYW1pbmdPdXRwdXRDYWxsUmVxdWVzdBopLmdy", + "cGMudGVzdGluZy5TdHJlYW1pbmdPdXRwdXRDYWxsUmVzcG9uc2UoATABEmkK", + "DkhhbGZEdXBsZXhDYWxsEiguZ3JwYy50ZXN0aW5nLlN0cmVhbWluZ091dHB1", + "dENhbGxSZXF1ZXN0GikuZ3JwYy50ZXN0aW5nLlN0cmVhbWluZ091dHB1dENh", + "bGxSZXNwb25zZSgBMAESPQoRVW5pbXBsZW1lbnRlZENhbGwSEy5ncnBjLnRl", + "c3RpbmcuRW1wdHkaEy5ncnBjLnRlc3RpbmcuRW1wdHkyVQoUVW5pbXBsZW1l", + "bnRlZFNlcnZpY2USPQoRVW5pbXBsZW1lbnRlZENhbGwSEy5ncnBjLnRlc3Rp", + "bmcuRW1wdHkaEy5ncnBjLnRlc3RpbmcuRW1wdHkyiQEKEFJlY29ubmVjdFNl", + "cnZpY2USOwoFU3RhcnQSHS5ncnBjLnRlc3RpbmcuUmVjb25uZWN0UGFyYW1z", + "GhMuZ3JwYy50ZXN0aW5nLkVtcHR5EjgKBFN0b3ASEy5ncnBjLnRlc3Rpbmcu", + "RW1wdHkaGy5ncnBjLnRlc3RpbmcuUmVjb25uZWN0SW5mb2IGcHJvdG8z")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Grpc.Testing.EmptyReflection.Descriptor, global::Grpc.Testing.MessagesReflection.Descriptor, }, new pbr::GeneratedClrTypeInfo(null, null)); diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index 61f2ed4015..8d649bf5c5 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -71,6 +71,13 @@ namespace Grpc.Testing { __Marshaller_SimpleRequest, __Marshaller_SimpleResponse); + static readonly Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> __Method_CacheableUnaryCall = new Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse>( + MethodType.Unary, + __ServiceName, + "CacheableUnaryCall", + __Marshaller_SimpleRequest, + __Marshaller_SimpleResponse); + static readonly Method<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> __Method_StreamingOutputCall = new Method<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse>( MethodType.ServerStreaming, __ServiceName, @@ -99,6 +106,13 @@ namespace Grpc.Testing { __Marshaller_StreamingOutputCallRequest, __Marshaller_StreamingOutputCallResponse); + static readonly Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty> __Method_UnimplementedCall = new Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty>( + MethodType.Unary, + __ServiceName, + "UnimplementedCall", + __Marshaller_Empty, + __Marshaller_Empty); + /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor { @@ -125,6 +139,16 @@ namespace Grpc.Testing { } /// <summary> + /// One request followed by one response. Response has cache control + /// headers set such that a caching HTTP proxy (such as GFE) can + /// satisfy subsequent requests. + /// </summary> + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> CacheableUnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context) + { + throw new RpcException(new Status(StatusCode.Unimplemented, "")); + } + + /// <summary> /// One request followed by a sequence of responses (streamed download). /// The server returns the payload with client desired type and sizes. /// </summary> @@ -163,6 +187,15 @@ namespace Grpc.Testing { throw new RpcException(new Status(StatusCode.Unimplemented, "")); } + /// <summary> + /// The test server will not implement this method. It will be used + /// to test the behavior when clients call unimplemented methods. + /// </summary> + public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context) + { + throw new RpcException(new Status(StatusCode.Unimplemented, "")); + } + } /// <summary>Client for TestService</summary> @@ -245,6 +278,42 @@ namespace Grpc.Testing { return CallInvoker.AsyncUnaryCall(__Method_UnaryCall, null, options, request); } /// <summary> + /// One request followed by one response. Response has cache control + /// headers set such that a caching HTTP proxy (such as GFE) can + /// satisfy subsequent requests. + /// </summary> + public virtual global::Grpc.Testing.SimpleResponse CacheableUnaryCall(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + { + return CacheableUnaryCall(request, new CallOptions(headers, deadline, cancellationToken)); + } + /// <summary> + /// One request followed by one response. Response has cache control + /// headers set such that a caching HTTP proxy (such as GFE) can + /// satisfy subsequent requests. + /// </summary> + public virtual global::Grpc.Testing.SimpleResponse CacheableUnaryCall(global::Grpc.Testing.SimpleRequest request, CallOptions options) + { + return CallInvoker.BlockingUnaryCall(__Method_CacheableUnaryCall, null, options, request); + } + /// <summary> + /// One request followed by one response. Response has cache control + /// headers set such that a caching HTTP proxy (such as GFE) can + /// satisfy subsequent requests. + /// </summary> + public virtual AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> CacheableUnaryCallAsync(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + { + return CacheableUnaryCallAsync(request, new CallOptions(headers, deadline, cancellationToken)); + } + /// <summary> + /// One request followed by one response. Response has cache control + /// headers set such that a caching HTTP proxy (such as GFE) can + /// satisfy subsequent requests. + /// </summary> + public virtual AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> CacheableUnaryCallAsync(global::Grpc.Testing.SimpleRequest request, CallOptions options) + { + return CallInvoker.AsyncUnaryCall(__Method_CacheableUnaryCall, null, options, request); + } + /// <summary> /// One request followed by a sequence of responses (streamed download). /// The server returns the payload with client desired type and sizes. /// </summary> @@ -314,6 +383,38 @@ namespace Grpc.Testing { { return CallInvoker.AsyncDuplexStreamingCall(__Method_HalfDuplexCall, null, options); } + /// <summary> + /// The test server will not implement this method. It will be used + /// to test the behavior when clients call unimplemented methods. + /// </summary> + public virtual global::Grpc.Testing.Empty UnimplementedCall(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + { + return UnimplementedCall(request, new CallOptions(headers, deadline, cancellationToken)); + } + /// <summary> + /// The test server will not implement this method. It will be used + /// to test the behavior when clients call unimplemented methods. + /// </summary> + public virtual global::Grpc.Testing.Empty UnimplementedCall(global::Grpc.Testing.Empty request, CallOptions options) + { + return CallInvoker.BlockingUnaryCall(__Method_UnimplementedCall, null, options, request); + } + /// <summary> + /// The test server will not implement this method. It will be used + /// to test the behavior when clients call unimplemented methods. + /// </summary> + public virtual AsyncUnaryCall<global::Grpc.Testing.Empty> UnimplementedCallAsync(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + { + return UnimplementedCallAsync(request, new CallOptions(headers, deadline, cancellationToken)); + } + /// <summary> + /// The test server will not implement this method. It will be used + /// to test the behavior when clients call unimplemented methods. + /// </summary> + public virtual AsyncUnaryCall<global::Grpc.Testing.Empty> UnimplementedCallAsync(global::Grpc.Testing.Empty request, CallOptions options) + { + return CallInvoker.AsyncUnaryCall(__Method_UnimplementedCall, null, options, request); + } /// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary> protected override TestServiceClient NewInstance(ClientBaseConfiguration configuration) { @@ -327,10 +428,12 @@ namespace Grpc.Testing { return ServerServiceDefinition.CreateBuilder() .AddMethod(__Method_EmptyCall, serviceImpl.EmptyCall) .AddMethod(__Method_UnaryCall, serviceImpl.UnaryCall) + .AddMethod(__Method_CacheableUnaryCall, serviceImpl.CacheableUnaryCall) .AddMethod(__Method_StreamingOutputCall, serviceImpl.StreamingOutputCall) .AddMethod(__Method_StreamingInputCall, serviceImpl.StreamingInputCall) .AddMethod(__Method_FullDuplexCall, serviceImpl.FullDuplexCall) - .AddMethod(__Method_HalfDuplexCall, serviceImpl.HalfDuplexCall).Build(); + .AddMethod(__Method_HalfDuplexCall, serviceImpl.HalfDuplexCall) + .AddMethod(__Method_UnimplementedCall, serviceImpl.UnimplementedCall).Build(); } } diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index e8f2d37bd8..a59a66b2aa 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -375,7 +375,8 @@ function statusCodeAndMessage(client, done) { duplex.end(); } -function unimplementedMethod(client, done) { +// NOTE: the client param to this function is from UnimplementedService +function unimplementedService(client, done) { client.unimplementedCall({}, function(err, resp) { assert(err); assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); @@ -384,6 +385,15 @@ function unimplementedMethod(client, done) { }); } +// NOTE: the client param to this function is from TestService +function unimplementedMethod(client, done) { + client.unimplementedCall({}, function(err, resp) { + assert(err); + assert.strictEqual(err.code, grpc.status.UNIMPLEMENTED); + done(); + }); +} + /** * Run one of the authentication tests. * @param {string} expected_user The expected username in the response @@ -527,8 +537,10 @@ var test_cases = { Client: testProto.TestService}, status_code_and_message: {run: statusCodeAndMessage, Client: testProto.TestService}, - unimplemented_method: {run: unimplementedMethod, + unimplemented_service: {run: unimplementedService, Client: testProto.UnimplementedService}, + unimplemented_method: {run: unimplementedMethod, + Client: testProto.TestService}, compute_engine_creds: {run: computeEngineCreds, Client: testProto.TestService, getCreds: getApplicationCreds}, diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index f008a87585..58f8842c0d 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -98,6 +98,10 @@ describe('Interop tests', function() { interop_client.runTest(port, name_override, 'status_code_and_message', true, true, done); }); + it('should pass unimplemented_service', function(done) { + interop_client.runTest(port, name_override, 'unimplemented_service', + true, true, done); + }); it('should pass unimplemented_method', function(done) { interop_client.runTest(port, name_override, 'unimplemented_method', true, true, done); |