diff options
Diffstat (limited to 'src')
21 files changed, 165 insertions, 119 deletions
diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc index 8de0997ebe..5178115e44 100644 --- a/src/compiler/objective_c_plugin.cc +++ b/src/compiler/objective_c_plugin.cc @@ -68,6 +68,7 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { ::grpc::string imports = ::grpc::string("#import \"") + file_name + ".pbobjc.h\"\n\n" "#import <ProtoRPC/ProtoService.h>\n" + "#import <ProtoRPC/ProtoRPC.h>\n" "#import <RxLibrary/GRXWriteable.h>\n" "#import <RxLibrary/GRXWriter.h>\n"; diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index ce9abdad61..8d28e829d8 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -96,17 +96,10 @@ static void method_parameters_unref(method_parameters *method_params) { } } -static void *method_parameters_copy(void *value) { - return method_parameters_ref(value); -} - static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { method_parameters_unref(value); } -static const grpc_slice_hash_table_vtable method_parameters_vtable = { - method_parameters_free, method_parameters_copy}; - static bool parse_wait_for_ready(grpc_json *field, wait_for_ready_value *wait_for_ready) { if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { @@ -472,7 +465,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_uri_destroy(uri); method_params_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, method_parameters_create_from_json, - &method_parameters_vtable); + method_parameters_free); grpc_service_config_destroy(service_config); } } diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index a271d05ca8..f8524732df 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -750,18 +750,11 @@ static void destroy_balancer_name(grpc_exec_ctx *exec_ctx, gpr_free(balancer_name); } -static void *copy_balancer_name(void *balancer_name) { - return gpr_strdup(balancer_name); -} - static grpc_slice_hash_table_entry targets_info_entry_create( const char *address, const char *balancer_name) { - static const grpc_slice_hash_table_vtable vtable = {destroy_balancer_name, - copy_balancer_name}; grpc_slice_hash_table_entry entry; entry.key = grpc_slice_from_copied_string(address); - entry.value = (void *)balancer_name; - entry.vtable = &vtable; + entry.value = gpr_strdup(balancer_name); return entry; } @@ -825,11 +818,8 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx, uri_path); gpr_free(uri_path); - *targets_info = - grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries); - for (size_t i = 0; i < num_grpclb_addrs; i++) { - grpc_slice_unref_internal(exec_ctx, targets_info_entries[i].key); - } + *targets_info = grpc_slice_hash_table_create( + num_grpclb_addrs, targets_info_entries, destroy_balancer_name); gpr_free(targets_info_entries); return target_uri_str; diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index f414a60eee..4f5f41e9b0 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -49,8 +49,6 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" -int grpc_compression_trace = 0; - #define INITIAL_METADATA_UNSEEN 0 #define HAS_COMPRESSION_ALGORITHM 2 #define NO_COMPRESSION_ALGORITHM 4 diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c index e3ffc41f90..b615116965 100644 --- a/src/core/ext/filters/message_size/message_size_filter.c +++ b/src/core/ext/filters/message_size/message_size_filter.c @@ -50,19 +50,10 @@ typedef struct message_size_limits { int max_recv_size; } message_size_limits; -static void* message_size_limits_copy(void* value) { - void* new_value = gpr_malloc(sizeof(message_size_limits)); - memcpy(new_value, value, sizeof(message_size_limits)); - return new_value; -} - static void message_size_limits_free(grpc_exec_ctx* exec_ctx, void* value) { gpr_free(value); } -static const grpc_slice_hash_table_vtable message_size_limits_vtable = { - message_size_limits_free, message_size_limits_copy}; - static void* message_size_limits_create_from_json(const grpc_json* json) { int max_request_message_bytes = -1; int max_response_message_bytes = -1; @@ -257,7 +248,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, chand->method_limit_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, message_size_limits_create_from_json, - &message_size_limits_vtable); + message_size_limits_free); grpc_service_config_destroy(service_config); } } diff --git a/src/core/lib/slice/slice_hash_table.c b/src/core/lib/slice/slice_hash_table.c index 219567f36f..444f22aa19 100644 --- a/src/core/lib/slice/slice_hash_table.c +++ b/src/core/lib/slice/slice_hash_table.c @@ -42,56 +42,47 @@ struct grpc_slice_hash_table { gpr_refcount refs; + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value); size_t size; + size_t max_num_probes; grpc_slice_hash_table_entry* entries; }; static bool is_empty(grpc_slice_hash_table_entry* entry) { - return entry->vtable == NULL; + return entry->value == NULL; } -// Helper function for insert and get operations that performs quadratic -// probing (https://en.wikipedia.org/wiki/Quadratic_probing). -static size_t grpc_slice_hash_table_find_index( - const grpc_slice_hash_table* table, const grpc_slice key, bool find_empty) { - size_t hash = grpc_slice_hash(key); - for (size_t i = 0; i < table->size; ++i) { - const size_t idx = (hash + i * i) % table->size; +static void grpc_slice_hash_table_add(grpc_slice_hash_table* table, + grpc_slice key, void* value) { + GPR_ASSERT(value != NULL); + const size_t hash = grpc_slice_hash(key); + for (size_t offset = 0; offset < table->size; ++offset) { + const size_t idx = (hash + offset) % table->size; if (is_empty(&table->entries[idx])) { - return find_empty ? idx : table->size; - } - if (grpc_slice_eq(table->entries[idx].key, key)) { - return idx; + table->entries[idx].key = key; + table->entries[idx].value = value; + // Keep track of the maximum number of probes needed, since this + // provides an upper bound for lookups. + if (offset > table->max_num_probes) table->max_num_probes = offset; + return; } } - return table->size; // Not found. -} - -static void grpc_slice_hash_table_add( - grpc_slice_hash_table* table, grpc_slice key, void* value, - const grpc_slice_hash_table_vtable* vtable) { - GPR_ASSERT(value != NULL); - const size_t idx = - grpc_slice_hash_table_find_index(table, key, true /* find_empty */); - GPR_ASSERT(idx != table->size); // Table should never be full. - grpc_slice_hash_table_entry* entry = &table->entries[idx]; - entry->key = grpc_slice_ref_internal(key); - entry->value = vtable->copy_value(value); - entry->vtable = vtable; + GPR_ASSERT(false); // Table should never be full. } grpc_slice_hash_table* grpc_slice_hash_table_create( - size_t num_entries, grpc_slice_hash_table_entry* entries) { + size_t num_entries, grpc_slice_hash_table_entry* entries, + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) { grpc_slice_hash_table* table = gpr_zalloc(sizeof(*table)); gpr_ref_init(&table->refs, 1); - // Quadratic probing gets best performance when the table is no more - // than half full. + table->destroy_value = destroy_value; + // Keep load factor low to improve performance of lookups. table->size = num_entries * 2; const size_t entry_size = sizeof(grpc_slice_hash_table_entry) * table->size; table->entries = gpr_zalloc(entry_size); for (size_t i = 0; i < num_entries; ++i) { grpc_slice_hash_table_entry* entry = &entries[i]; - grpc_slice_hash_table_add(table, entry->key, entry->value, entry->vtable); + grpc_slice_hash_table_add(table, entry->key, entry->value); } return table; } @@ -108,7 +99,7 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx* exec_ctx, grpc_slice_hash_table_entry* entry = &table->entries[i]; if (!is_empty(entry)) { grpc_slice_unref_internal(exec_ctx, entry->key); - entry->vtable->destroy_value(exec_ctx, entry->value); + table->destroy_value(exec_ctx, entry->value); } } gpr_free(table->entries); @@ -118,8 +109,15 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx* exec_ctx, void* grpc_slice_hash_table_get(const grpc_slice_hash_table* table, const grpc_slice key) { - const size_t idx = - grpc_slice_hash_table_find_index(table, key, false /* find_empty */); - if (idx == table->size) return NULL; // Not found. - return table->entries[idx].value; + const size_t hash = grpc_slice_hash(key); + // We cap the number of probes at the max number recorded when + // populating the table. + for (size_t offset = 0; offset <= table->max_num_probes; ++offset) { + const size_t idx = (hash + offset) % table->size; + if (is_empty(&table->entries[idx])) break; + if (grpc_slice_eq(table->entries[idx].key, key)) { + return table->entries[idx].value; + } + } + return NULL; // Not found. } diff --git a/src/core/lib/slice/slice_hash_table.h b/src/core/lib/slice/slice_hash_table.h index d0c27122d7..1e61c5eb11 100644 --- a/src/core/lib/slice/slice_hash_table.h +++ b/src/core/lib/slice/slice_hash_table.h @@ -37,33 +37,28 @@ /** Hash table implementation. * * This implementation uses open addressing - * (https://en.wikipedia.org/wiki/Open_addressing) with quadratic - * probing (https://en.wikipedia.org/wiki/Quadratic_probing). + * (https://en.wikipedia.org/wiki/Open_addressing) with linear + * probing (https://en.wikipedia.org/wiki/Linear_probing). * * The keys are \a grpc_slice objects. The values are arbitrary pointers - * with a common vtable. + * with a common destroy function. * * Hash tables are intentionally immutable, to avoid the need for locking. */ typedef struct grpc_slice_hash_table grpc_slice_hash_table; -typedef struct grpc_slice_hash_table_vtable { - void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value); - void *(*copy_value)(void *value); -} grpc_slice_hash_table_vtable; - typedef struct grpc_slice_hash_table_entry { grpc_slice key; void *value; /* Must not be NULL. */ - const grpc_slice_hash_table_vtable *vtable; } grpc_slice_hash_table_entry; /** Creates a new hash table of containing \a entries, which is an array - of length \a num_entries. - Creates its own copy of all keys and values from \a entries. */ + of length \a num_entries. Takes ownership of all keys and values in + \a entries. Values will be cleaned up via \a destroy_value(). */ grpc_slice_hash_table *grpc_slice_hash_table_create( - size_t num_entries, grpc_slice_hash_table_entry *entries); + size_t num_entries, grpc_slice_hash_table_entry *entries, + void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value)); grpc_slice_hash_table *grpc_slice_hash_table_ref(grpc_slice_hash_table *table); void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c index c481a3e0dc..dfbd3fb125 100644 --- a/src/core/lib/support/stack_lockfree.c +++ b/src/core/lib/support/stack_lockfree.c @@ -76,13 +76,13 @@ struct gpr_stack_lockfree { gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) { gpr_stack_lockfree *stack; - stack = gpr_malloc(sizeof(*stack)); + stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack)); /* Since we only allocate 16 bits to represent an entry number, * make sure that we are within the desired range */ /* Reserve the highest entry number as a dummy */ GPR_ASSERT(entries < INVALID_ENTRY_INDEX); - stack->entries = gpr_malloc_aligned(entries * sizeof(stack->entries[0]), - ENTRY_ALIGNMENT_BITS); + stack->entries = (lockfree_node *)gpr_malloc_aligned( + entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS); /* Clear out all entries */ memset(stack->entries, 0, entries * sizeof(stack->entries[0])); memset(&stack->head, 0, sizeof(stack->head)); diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c index a69c501e9f..9bfec7782a 100644 --- a/src/core/lib/support/time_posix.c +++ b/src/core/lib/support/time_posix.c @@ -42,6 +42,7 @@ #ifdef __linux__ #include <sys/syscall.h> #endif +#include <grpc/support/atm.h> #include <grpc/support/log.h> #include <grpc/support/time.h> #include "src/core/lib/support/block_annotate.h" @@ -144,7 +145,14 @@ static gpr_timespec now_impl(gpr_clock_type clock) { gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type) = now_impl; +#ifdef GPR_LOW_LEVEL_COUNTERS +gpr_atm gpr_now_call_count; +#endif + gpr_timespec gpr_now(gpr_clock_type clock_type) { +#ifdef GPR_LOW_LEVEL_COUNTERS + __atomic_fetch_add(&gpr_now_call_count, 1, __ATOMIC_RELAXED); +#endif return gpr_now_impl(clock_type); } diff --git a/src/core/lib/support/tmpfile_posix.c b/src/core/lib/support/tmpfile_posix.c index 0cd4bb6fc3..5771c158e0 100644 --- a/src/core/lib/support/tmpfile_posix.c +++ b/src/core/lib/support/tmpfile_posix.c @@ -50,34 +50,34 @@ FILE *gpr_tmpfile(const char *prefix, char **tmp_filename) { FILE *result = NULL; - char *template; + char *filename_template; int fd; if (tmp_filename != NULL) *tmp_filename = NULL; - gpr_asprintf(&template, "/tmp/%s_XXXXXX", prefix); - GPR_ASSERT(template != NULL); + gpr_asprintf(&filename_template, "/tmp/%s_XXXXXX", prefix); + GPR_ASSERT(filename_template != NULL); - fd = mkstemp(template); + fd = mkstemp(filename_template); if (fd == -1) { - gpr_log(GPR_ERROR, "mkstemp failed for template %s with error %s.", - template, strerror(errno)); + gpr_log(GPR_ERROR, "mkstemp failed for filename_template %s with error %s.", + filename_template, strerror(errno)); goto end; } result = fdopen(fd, "w+"); if (result == NULL) { gpr_log(GPR_ERROR, "Could not open file %s from fd %d (error = %s).", - template, fd, strerror(errno)); - unlink(template); + filename_template, fd, strerror(errno)); + unlink(filename_template); close(fd); goto end; } end: if (result != NULL && tmp_filename != NULL) { - *tmp_filename = template; + *tmp_filename = filename_template; } else { - gpr_free(template); + gpr_free(filename_template); } return result; } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 2581c1c71f..8a40378a19 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -245,6 +245,7 @@ struct grpc_call { }; int grpc_call_error_trace = 0; +int grpc_compression_trace = 0; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index ef14124fef..430cbd90ac 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -1033,8 +1033,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { grpc_server *server = gpr_zalloc(sizeof(grpc_server)); - GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); - gpr_mu_init(&server->mu_global); gpr_mu_init(&server->mu_call); gpr_cv_init(&server->starting_cv); diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 1195f75044..6aecb7fa93 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -162,7 +162,6 @@ static char* parse_json_method_name(grpc_json* json) { static bool parse_json_method_config( grpc_exec_ctx* exec_ctx, grpc_json* json, void* (*create_value)(const grpc_json* method_config_json), - const grpc_slice_hash_table_vtable* vtable, grpc_slice_hash_table_entry* entries, size_t* idx) { // Construct value. void* method_config = create_value(json); @@ -185,13 +184,11 @@ static bool parse_json_method_config( // Add entry for each path. for (size_t i = 0; i < paths.count; ++i) { entries[*idx].key = grpc_slice_from_copied_string(paths.strs[i]); - entries[*idx].value = vtable->copy_value(method_config); - entries[*idx].vtable = vtable; + entries[*idx].value = method_config; ++*idx; } success = true; done: - vtable->destroy_value(exec_ctx, method_config); gpr_strvec_destroy(&paths); return success; } @@ -199,7 +196,7 @@ done: grpc_slice_hash_table* grpc_service_config_create_method_config_table( grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config, void* (*create_value)(const grpc_json* method_config_json), - const grpc_slice_hash_table_vtable* vtable) { + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) { const grpc_json* json = service_config->json_tree; // Traverse parsed JSON tree. if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return NULL; @@ -220,8 +217,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table( size_t idx = 0; for (grpc_json* method = field->child; method != NULL; method = method->next) { - if (!parse_json_method_config(exec_ctx, method, create_value, vtable, - entries, &idx)) { + if (!parse_json_method_config(exec_ctx, method, create_value, entries, + &idx)) { return NULL; } } @@ -231,12 +228,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table( // Instantiate method config table. grpc_slice_hash_table* method_config_table = NULL; if (entries != NULL) { - method_config_table = grpc_slice_hash_table_create(num_entries, entries); - // Clean up. - for (size_t i = 0; i < num_entries; ++i) { - grpc_slice_unref_internal(exec_ctx, entries[i].key); - vtable->destroy_value(exec_ctx, entries[i].value); - } + method_config_table = + grpc_slice_hash_table_create(num_entries, entries, destroy_value); gpr_free(entries); } return method_config_table; diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index ebfc59b534..e0548b9c3f 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -57,12 +57,12 @@ const char* grpc_service_config_get_lb_policy_name( /// Creates a method config table based on the data in \a json. /// The table's keys are request paths. The table's value type is /// returned by \a create_value(), based on data parsed from the JSON tree. -/// \a vtable provides methods used to manage the values. +/// \a destroy_value is used to clean up values. /// Returns NULL on error. grpc_slice_hash_table* grpc_service_config_create_method_config_table( grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config, void* (*create_value)(const grpc_json* method_config_json), - const grpc_slice_hash_table_vtable* vtable); + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)); /// A helper function for looking up values in the table returned by /// \a grpc_service_config_create_method_config_table(). diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h index 7645bb1d34..5e9324c445 100644 --- a/src/objective-c/GRPCClient/GRPCCall.h +++ b/src/objective-c/GRPCClient/GRPCCall.h @@ -253,6 +253,13 @@ extern id const kGRPCTrailersKey; */ + (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path; +/** + * Set the dispatch queue to be used for callbacks. + * + * This configuration is only effective before the call starts. + */ +- (void)setResponseDispatchQueue:(dispatch_queue_t)queue; + // TODO(jcanizales): Let specify a deadline. As a category of GRXWriter? @end diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 051138ea4d..f9d13fea57 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -113,6 +113,10 @@ static NSMutableDictionary *callFlags; // the SendClose op is added. BOOL _unaryCall; NSMutableArray *_unaryOpBatch; + + // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch + // queue + dispatch_queue_t _responseQueue; } @synthesize state = _state; @@ -175,10 +179,19 @@ static NSMutableDictionary *callFlags; _unaryCall = YES; _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch]; } + + _responseQueue = dispatch_get_main_queue(); } return self; } +- (void)setResponseDispatchQueue:(dispatch_queue_t)queue { + if (_state != GRXWriterStateNotStarted) { + return; + } + _responseQueue = queue; +} + #pragma mark Finish - (void)finishWithError:(NSError *)errorOrNil { @@ -424,7 +437,8 @@ static NSMutableDictionary *callFlags; // that the life of the instance is determined by this retain cycle. _retainSelf = self; - _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; + _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable + dispatchQueue:_responseQueue]; _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path]; NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?"); diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h index b2775f98b5..07004f6d4d 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h @@ -53,7 +53,9 @@ * The GRXWriteable instance is retained until writesFinishedWithError: is sent to it, and released * after that. */ -- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable NS_DESIGNATED_INITIALIZER; +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable + dispatchQueue:(dispatch_queue_t)queue NS_DESIGNATED_INITIALIZER; +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable; /** * Enqueues writeValue: to be sent to the writeable in the main thread. diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m index 08bd079aea..88aa7a7282 100644 --- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m +++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m @@ -51,14 +51,20 @@ } // Designated initializer -- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable + dispatchQueue:(dispatch_queue_t)queue { if (self = [super init]) { - _writeableQueue = dispatch_get_main_queue(); + _writeableQueue = queue; _writeable = writeable; } return self; } +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { + return [self initWithWriteable:writeable + dispatchQueue:dispatch_get_main_queue()]; +} + - (void)enqueueValue:(id)value completionHandler:(void (^)())handler { dispatch_async(_writeableQueue, ^{ // We're racing a possible cancellation performed by another thread. To turn all already- diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index 76c15003f6..e36f5c3ee9 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -353,4 +353,59 @@ static GRPCProtoMethod *kUnaryCallMethod; [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; } +- (void)testAlternateDispatchQueue { + const int32_t kPayloadSize = 100; + RMTSimpleRequest *request = [RMTSimpleRequest message]; + request.responseSize = kPayloadSize; + + __weak XCTestExpectation *expectation1 = [self expectationWithDescription:@"AlternateDispatchQueue1"]; + + // Use default (main) dispatch queue + NSString *main_queue_label = [NSString stringWithUTF8String:dispatch_queue_get_label(dispatch_get_main_queue())]; + + GRXWriter *requestsWriter1 = [GRXWriter writerWithValue:[request data]]; + + GRPCCall *call1 = [[GRPCCall alloc] initWithHost:kHostAddress + path:kUnaryCallMethod.HTTPPath + requestsWriter:requestsWriter1]; + + id<GRXWriteable> responsesWriteable1 = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { + NSString *label = [NSString stringWithUTF8String:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)]; + XCTAssert([label isEqualToString:main_queue_label]); + + [expectation1 fulfill]; + } completionHandler:^(NSError *errorOrNil) { + }]; + + [call1 startWithWriteable:responsesWriteable1]; + + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; + + // Use a custom queue + __weak XCTestExpectation *expectation2 = [self expectationWithDescription:@"AlternateDispatchQueue2"]; + + NSString *queue_label = @"test.queue1"; + dispatch_queue_t queue = dispatch_queue_create([queue_label UTF8String], DISPATCH_QUEUE_SERIAL); + + GRXWriter *requestsWriter2 = [GRXWriter writerWithValue:[request data]]; + + GRPCCall *call2 = [[GRPCCall alloc] initWithHost:kHostAddress + path:kUnaryCallMethod.HTTPPath + requestsWriter:requestsWriter2]; + + [call2 setResponseDispatchQueue:queue]; + + id<GRXWriteable> responsesWriteable2 = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { + NSString *label = [NSString stringWithUTF8String:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)]; + XCTAssert([label isEqualToString:queue_label]); + + [expectation2 fulfill]; + } completionHandler:^(NSError *errorOrNil) { + }]; + + [call2 startWithWriteable:responsesWriteable2]; + + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; +} + @end diff --git a/src/proto/grpc/status/BUILD b/src/proto/grpc/status/BUILD index c17f87eb3d..71363bd1b6 100644 --- a/src/proto/grpc/status/BUILD +++ b/src/proto/grpc/status/BUILD @@ -37,7 +37,5 @@ grpc_proto_library( name = "status_proto", srcs = ["status.proto"], has_services = False, - well_known_protos = "@submodule_protobuf//:well_known_protos", + well_known_protos = "@com_google_protobuf//:well_known_protos", ) - - diff --git a/src/python/grpcio_tests/tests/http2/negative_http2_client.py b/src/python/grpcio_tests/tests/http2/negative_http2_client.py index b184e62cfd..90f54e80bb 100644 --- a/src/python/grpcio_tests/tests/http2/negative_http2_client.py +++ b/src/python/grpcio_tests/tests/http2/negative_http2_client.py @@ -96,8 +96,6 @@ def _rst_during_data(stub): def _rst_after_data(stub): resp_future = stub.UnaryCall.future(_SIMPLE_REQUEST) - _validate_payload_type_and_length( - next(resp_future), messages_pb2.COMPRESSABLE, _RESPONSE_SIZE) _validate_status_code_and_details(resp_future, grpc.StatusCode.INTERNAL, "Received RST_STREAM with error code 0") |