aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/objective_c_plugin.cc1
-rw-r--r--src/core/ext/filters/client_channel/client_channel.c9
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c16
-rw-r--r--src/core/ext/filters/http/message_compress/message_compress_filter.c2
-rw-r--r--src/core/ext/filters/message_size/message_size_filter.c11
-rw-r--r--src/core/lib/slice/slice_hash_table.c68
-rw-r--r--src/core/lib/slice/slice_hash_table.h19
-rw-r--r--src/core/lib/support/stack_lockfree.c6
-rw-r--r--src/core/lib/support/time_posix.c8
-rw-r--r--src/core/lib/support/tmpfile_posix.c20
-rw-r--r--src/core/lib/surface/call.c1
-rw-r--r--src/core/lib/surface/server.c2
-rw-r--r--src/core/lib/transport/service_config.c19
-rw-r--r--src/core/lib/transport/service_config.h4
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.h7
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m16
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.h4
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.m10
-rw-r--r--src/objective-c/tests/GRPCClientTests.m55
-rw-r--r--src/proto/grpc/status/BUILD4
-rw-r--r--src/python/grpcio_tests/tests/http2/negative_http2_client.py2
21 files changed, 165 insertions, 119 deletions
diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc
index 8de0997ebe..5178115e44 100644
--- a/src/compiler/objective_c_plugin.cc
+++ b/src/compiler/objective_c_plugin.cc
@@ -68,6 +68,7 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
::grpc::string imports = ::grpc::string("#import \"") + file_name +
".pbobjc.h\"\n\n"
"#import <ProtoRPC/ProtoService.h>\n"
+ "#import <ProtoRPC/ProtoRPC.h>\n"
"#import <RxLibrary/GRXWriteable.h>\n"
"#import <RxLibrary/GRXWriter.h>\n";
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c
index ce9abdad61..8d28e829d8 100644
--- a/src/core/ext/filters/client_channel/client_channel.c
+++ b/src/core/ext/filters/client_channel/client_channel.c
@@ -96,17 +96,10 @@ static void method_parameters_unref(method_parameters *method_params) {
}
}
-static void *method_parameters_copy(void *value) {
- return method_parameters_ref(value);
-}
-
static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) {
method_parameters_unref(value);
}
-static const grpc_slice_hash_table_vtable method_parameters_vtable = {
- method_parameters_free, method_parameters_copy};
-
static bool parse_wait_for_ready(grpc_json *field,
wait_for_ready_value *wait_for_ready) {
if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) {
@@ -472,7 +465,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
grpc_uri_destroy(uri);
method_params_table = grpc_service_config_create_method_config_table(
exec_ctx, service_config, method_parameters_create_from_json,
- &method_parameters_vtable);
+ method_parameters_free);
grpc_service_config_destroy(service_config);
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index a271d05ca8..f8524732df 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -750,18 +750,11 @@ static void destroy_balancer_name(grpc_exec_ctx *exec_ctx,
gpr_free(balancer_name);
}
-static void *copy_balancer_name(void *balancer_name) {
- return gpr_strdup(balancer_name);
-}
-
static grpc_slice_hash_table_entry targets_info_entry_create(
const char *address, const char *balancer_name) {
- static const grpc_slice_hash_table_vtable vtable = {destroy_balancer_name,
- copy_balancer_name};
grpc_slice_hash_table_entry entry;
entry.key = grpc_slice_from_copied_string(address);
- entry.value = (void *)balancer_name;
- entry.vtable = &vtable;
+ entry.value = gpr_strdup(balancer_name);
return entry;
}
@@ -825,11 +818,8 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx,
uri_path);
gpr_free(uri_path);
- *targets_info =
- grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries);
- for (size_t i = 0; i < num_grpclb_addrs; i++) {
- grpc_slice_unref_internal(exec_ctx, targets_info_entries[i].key);
- }
+ *targets_info = grpc_slice_hash_table_create(
+ num_grpclb_addrs, targets_info_entries, destroy_balancer_name);
gpr_free(targets_info_entries);
return target_uri_str;
diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c
index f414a60eee..4f5f41e9b0 100644
--- a/src/core/ext/filters/http/message_compress/message_compress_filter.c
+++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c
@@ -49,8 +49,6 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
-int grpc_compression_trace = 0;
-
#define INITIAL_METADATA_UNSEEN 0
#define HAS_COMPRESSION_ALGORITHM 2
#define NO_COMPRESSION_ALGORITHM 4
diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c
index e3ffc41f90..b615116965 100644
--- a/src/core/ext/filters/message_size/message_size_filter.c
+++ b/src/core/ext/filters/message_size/message_size_filter.c
@@ -50,19 +50,10 @@ typedef struct message_size_limits {
int max_recv_size;
} message_size_limits;
-static void* message_size_limits_copy(void* value) {
- void* new_value = gpr_malloc(sizeof(message_size_limits));
- memcpy(new_value, value, sizeof(message_size_limits));
- return new_value;
-}
-
static void message_size_limits_free(grpc_exec_ctx* exec_ctx, void* value) {
gpr_free(value);
}
-static const grpc_slice_hash_table_vtable message_size_limits_vtable = {
- message_size_limits_free, message_size_limits_copy};
-
static void* message_size_limits_create_from_json(const grpc_json* json) {
int max_request_message_bytes = -1;
int max_response_message_bytes = -1;
@@ -257,7 +248,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx,
chand->method_limit_table =
grpc_service_config_create_method_config_table(
exec_ctx, service_config, message_size_limits_create_from_json,
- &message_size_limits_vtable);
+ message_size_limits_free);
grpc_service_config_destroy(service_config);
}
}
diff --git a/src/core/lib/slice/slice_hash_table.c b/src/core/lib/slice/slice_hash_table.c
index 219567f36f..444f22aa19 100644
--- a/src/core/lib/slice/slice_hash_table.c
+++ b/src/core/lib/slice/slice_hash_table.c
@@ -42,56 +42,47 @@
struct grpc_slice_hash_table {
gpr_refcount refs;
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value);
size_t size;
+ size_t max_num_probes;
grpc_slice_hash_table_entry* entries;
};
static bool is_empty(grpc_slice_hash_table_entry* entry) {
- return entry->vtable == NULL;
+ return entry->value == NULL;
}
-// Helper function for insert and get operations that performs quadratic
-// probing (https://en.wikipedia.org/wiki/Quadratic_probing).
-static size_t grpc_slice_hash_table_find_index(
- const grpc_slice_hash_table* table, const grpc_slice key, bool find_empty) {
- size_t hash = grpc_slice_hash(key);
- for (size_t i = 0; i < table->size; ++i) {
- const size_t idx = (hash + i * i) % table->size;
+static void grpc_slice_hash_table_add(grpc_slice_hash_table* table,
+ grpc_slice key, void* value) {
+ GPR_ASSERT(value != NULL);
+ const size_t hash = grpc_slice_hash(key);
+ for (size_t offset = 0; offset < table->size; ++offset) {
+ const size_t idx = (hash + offset) % table->size;
if (is_empty(&table->entries[idx])) {
- return find_empty ? idx : table->size;
- }
- if (grpc_slice_eq(table->entries[idx].key, key)) {
- return idx;
+ table->entries[idx].key = key;
+ table->entries[idx].value = value;
+ // Keep track of the maximum number of probes needed, since this
+ // provides an upper bound for lookups.
+ if (offset > table->max_num_probes) table->max_num_probes = offset;
+ return;
}
}
- return table->size; // Not found.
-}
-
-static void grpc_slice_hash_table_add(
- grpc_slice_hash_table* table, grpc_slice key, void* value,
- const grpc_slice_hash_table_vtable* vtable) {
- GPR_ASSERT(value != NULL);
- const size_t idx =
- grpc_slice_hash_table_find_index(table, key, true /* find_empty */);
- GPR_ASSERT(idx != table->size); // Table should never be full.
- grpc_slice_hash_table_entry* entry = &table->entries[idx];
- entry->key = grpc_slice_ref_internal(key);
- entry->value = vtable->copy_value(value);
- entry->vtable = vtable;
+ GPR_ASSERT(false); // Table should never be full.
}
grpc_slice_hash_table* grpc_slice_hash_table_create(
- size_t num_entries, grpc_slice_hash_table_entry* entries) {
+ size_t num_entries, grpc_slice_hash_table_entry* entries,
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) {
grpc_slice_hash_table* table = gpr_zalloc(sizeof(*table));
gpr_ref_init(&table->refs, 1);
- // Quadratic probing gets best performance when the table is no more
- // than half full.
+ table->destroy_value = destroy_value;
+ // Keep load factor low to improve performance of lookups.
table->size = num_entries * 2;
const size_t entry_size = sizeof(grpc_slice_hash_table_entry) * table->size;
table->entries = gpr_zalloc(entry_size);
for (size_t i = 0; i < num_entries; ++i) {
grpc_slice_hash_table_entry* entry = &entries[i];
- grpc_slice_hash_table_add(table, entry->key, entry->value, entry->vtable);
+ grpc_slice_hash_table_add(table, entry->key, entry->value);
}
return table;
}
@@ -108,7 +99,7 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx* exec_ctx,
grpc_slice_hash_table_entry* entry = &table->entries[i];
if (!is_empty(entry)) {
grpc_slice_unref_internal(exec_ctx, entry->key);
- entry->vtable->destroy_value(exec_ctx, entry->value);
+ table->destroy_value(exec_ctx, entry->value);
}
}
gpr_free(table->entries);
@@ -118,8 +109,15 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx* exec_ctx,
void* grpc_slice_hash_table_get(const grpc_slice_hash_table* table,
const grpc_slice key) {
- const size_t idx =
- grpc_slice_hash_table_find_index(table, key, false /* find_empty */);
- if (idx == table->size) return NULL; // Not found.
- return table->entries[idx].value;
+ const size_t hash = grpc_slice_hash(key);
+ // We cap the number of probes at the max number recorded when
+ // populating the table.
+ for (size_t offset = 0; offset <= table->max_num_probes; ++offset) {
+ const size_t idx = (hash + offset) % table->size;
+ if (is_empty(&table->entries[idx])) break;
+ if (grpc_slice_eq(table->entries[idx].key, key)) {
+ return table->entries[idx].value;
+ }
+ }
+ return NULL; // Not found.
}
diff --git a/src/core/lib/slice/slice_hash_table.h b/src/core/lib/slice/slice_hash_table.h
index d0c27122d7..1e61c5eb11 100644
--- a/src/core/lib/slice/slice_hash_table.h
+++ b/src/core/lib/slice/slice_hash_table.h
@@ -37,33 +37,28 @@
/** Hash table implementation.
*
* This implementation uses open addressing
- * (https://en.wikipedia.org/wiki/Open_addressing) with quadratic
- * probing (https://en.wikipedia.org/wiki/Quadratic_probing).
+ * (https://en.wikipedia.org/wiki/Open_addressing) with linear
+ * probing (https://en.wikipedia.org/wiki/Linear_probing).
*
* The keys are \a grpc_slice objects. The values are arbitrary pointers
- * with a common vtable.
+ * with a common destroy function.
*
* Hash tables are intentionally immutable, to avoid the need for locking.
*/
typedef struct grpc_slice_hash_table grpc_slice_hash_table;
-typedef struct grpc_slice_hash_table_vtable {
- void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value);
- void *(*copy_value)(void *value);
-} grpc_slice_hash_table_vtable;
-
typedef struct grpc_slice_hash_table_entry {
grpc_slice key;
void *value; /* Must not be NULL. */
- const grpc_slice_hash_table_vtable *vtable;
} grpc_slice_hash_table_entry;
/** Creates a new hash table of containing \a entries, which is an array
- of length \a num_entries.
- Creates its own copy of all keys and values from \a entries. */
+ of length \a num_entries. Takes ownership of all keys and values in
+ \a entries. Values will be cleaned up via \a destroy_value(). */
grpc_slice_hash_table *grpc_slice_hash_table_create(
- size_t num_entries, grpc_slice_hash_table_entry *entries);
+ size_t num_entries, grpc_slice_hash_table_entry *entries,
+ void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value));
grpc_slice_hash_table *grpc_slice_hash_table_ref(grpc_slice_hash_table *table);
void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c
index c481a3e0dc..dfbd3fb125 100644
--- a/src/core/lib/support/stack_lockfree.c
+++ b/src/core/lib/support/stack_lockfree.c
@@ -76,13 +76,13 @@ struct gpr_stack_lockfree {
gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) {
gpr_stack_lockfree *stack;
- stack = gpr_malloc(sizeof(*stack));
+ stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack));
/* Since we only allocate 16 bits to represent an entry number,
* make sure that we are within the desired range */
/* Reserve the highest entry number as a dummy */
GPR_ASSERT(entries < INVALID_ENTRY_INDEX);
- stack->entries = gpr_malloc_aligned(entries * sizeof(stack->entries[0]),
- ENTRY_ALIGNMENT_BITS);
+ stack->entries = (lockfree_node *)gpr_malloc_aligned(
+ entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS);
/* Clear out all entries */
memset(stack->entries, 0, entries * sizeof(stack->entries[0]));
memset(&stack->head, 0, sizeof(stack->head));
diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c
index a69c501e9f..9bfec7782a 100644
--- a/src/core/lib/support/time_posix.c
+++ b/src/core/lib/support/time_posix.c
@@ -42,6 +42,7 @@
#ifdef __linux__
#include <sys/syscall.h>
#endif
+#include <grpc/support/atm.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/support/block_annotate.h"
@@ -144,7 +145,14 @@ static gpr_timespec now_impl(gpr_clock_type clock) {
gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type) = now_impl;
+#ifdef GPR_LOW_LEVEL_COUNTERS
+gpr_atm gpr_now_call_count;
+#endif
+
gpr_timespec gpr_now(gpr_clock_type clock_type) {
+#ifdef GPR_LOW_LEVEL_COUNTERS
+ __atomic_fetch_add(&gpr_now_call_count, 1, __ATOMIC_RELAXED);
+#endif
return gpr_now_impl(clock_type);
}
diff --git a/src/core/lib/support/tmpfile_posix.c b/src/core/lib/support/tmpfile_posix.c
index 0cd4bb6fc3..5771c158e0 100644
--- a/src/core/lib/support/tmpfile_posix.c
+++ b/src/core/lib/support/tmpfile_posix.c
@@ -50,34 +50,34 @@
FILE *gpr_tmpfile(const char *prefix, char **tmp_filename) {
FILE *result = NULL;
- char *template;
+ char *filename_template;
int fd;
if (tmp_filename != NULL) *tmp_filename = NULL;
- gpr_asprintf(&template, "/tmp/%s_XXXXXX", prefix);
- GPR_ASSERT(template != NULL);
+ gpr_asprintf(&filename_template, "/tmp/%s_XXXXXX", prefix);
+ GPR_ASSERT(filename_template != NULL);
- fd = mkstemp(template);
+ fd = mkstemp(filename_template);
if (fd == -1) {
- gpr_log(GPR_ERROR, "mkstemp failed for template %s with error %s.",
- template, strerror(errno));
+ gpr_log(GPR_ERROR, "mkstemp failed for filename_template %s with error %s.",
+ filename_template, strerror(errno));
goto end;
}
result = fdopen(fd, "w+");
if (result == NULL) {
gpr_log(GPR_ERROR, "Could not open file %s from fd %d (error = %s).",
- template, fd, strerror(errno));
- unlink(template);
+ filename_template, fd, strerror(errno));
+ unlink(filename_template);
close(fd);
goto end;
}
end:
if (result != NULL && tmp_filename != NULL) {
- *tmp_filename = template;
+ *tmp_filename = filename_template;
} else {
- gpr_free(template);
+ gpr_free(filename_template);
}
return result;
}
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 2581c1c71f..8a40378a19 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -245,6 +245,7 @@ struct grpc_call {
};
int grpc_call_error_trace = 0;
+int grpc_compression_trace = 0;
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
#define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index ef14124fef..430cbd90ac 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1033,8 +1033,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
grpc_server *server = gpr_zalloc(sizeof(grpc_server));
- GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
-
gpr_mu_init(&server->mu_global);
gpr_mu_init(&server->mu_call);
gpr_cv_init(&server->starting_cv);
diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c
index 1195f75044..6aecb7fa93 100644
--- a/src/core/lib/transport/service_config.c
+++ b/src/core/lib/transport/service_config.c
@@ -162,7 +162,6 @@ static char* parse_json_method_name(grpc_json* json) {
static bool parse_json_method_config(
grpc_exec_ctx* exec_ctx, grpc_json* json,
void* (*create_value)(const grpc_json* method_config_json),
- const grpc_slice_hash_table_vtable* vtable,
grpc_slice_hash_table_entry* entries, size_t* idx) {
// Construct value.
void* method_config = create_value(json);
@@ -185,13 +184,11 @@ static bool parse_json_method_config(
// Add entry for each path.
for (size_t i = 0; i < paths.count; ++i) {
entries[*idx].key = grpc_slice_from_copied_string(paths.strs[i]);
- entries[*idx].value = vtable->copy_value(method_config);
- entries[*idx].vtable = vtable;
+ entries[*idx].value = method_config;
++*idx;
}
success = true;
done:
- vtable->destroy_value(exec_ctx, method_config);
gpr_strvec_destroy(&paths);
return success;
}
@@ -199,7 +196,7 @@ done:
grpc_slice_hash_table* grpc_service_config_create_method_config_table(
grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config,
void* (*create_value)(const grpc_json* method_config_json),
- const grpc_slice_hash_table_vtable* vtable) {
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) {
const grpc_json* json = service_config->json_tree;
// Traverse parsed JSON tree.
if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return NULL;
@@ -220,8 +217,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table(
size_t idx = 0;
for (grpc_json* method = field->child; method != NULL;
method = method->next) {
- if (!parse_json_method_config(exec_ctx, method, create_value, vtable,
- entries, &idx)) {
+ if (!parse_json_method_config(exec_ctx, method, create_value, entries,
+ &idx)) {
return NULL;
}
}
@@ -231,12 +228,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table(
// Instantiate method config table.
grpc_slice_hash_table* method_config_table = NULL;
if (entries != NULL) {
- method_config_table = grpc_slice_hash_table_create(num_entries, entries);
- // Clean up.
- for (size_t i = 0; i < num_entries; ++i) {
- grpc_slice_unref_internal(exec_ctx, entries[i].key);
- vtable->destroy_value(exec_ctx, entries[i].value);
- }
+ method_config_table =
+ grpc_slice_hash_table_create(num_entries, entries, destroy_value);
gpr_free(entries);
}
return method_config_table;
diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h
index ebfc59b534..e0548b9c3f 100644
--- a/src/core/lib/transport/service_config.h
+++ b/src/core/lib/transport/service_config.h
@@ -57,12 +57,12 @@ const char* grpc_service_config_get_lb_policy_name(
/// Creates a method config table based on the data in \a json.
/// The table's keys are request paths. The table's value type is
/// returned by \a create_value(), based on data parsed from the JSON tree.
-/// \a vtable provides methods used to manage the values.
+/// \a destroy_value is used to clean up values.
/// Returns NULL on error.
grpc_slice_hash_table* grpc_service_config_create_method_config_table(
grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config,
void* (*create_value)(const grpc_json* method_config_json),
- const grpc_slice_hash_table_vtable* vtable);
+ void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value));
/// A helper function for looking up values in the table returned by
/// \a grpc_service_config_create_method_config_table().
diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h
index 7645bb1d34..5e9324c445 100644
--- a/src/objective-c/GRPCClient/GRPCCall.h
+++ b/src/objective-c/GRPCClient/GRPCCall.h
@@ -253,6 +253,13 @@ extern id const kGRPCTrailersKey;
*/
+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path;
+/**
+ * Set the dispatch queue to be used for callbacks.
+ *
+ * This configuration is only effective before the call starts.
+ */
+- (void)setResponseDispatchQueue:(dispatch_queue_t)queue;
+
// TODO(jcanizales): Let specify a deadline. As a category of GRXWriter?
@end
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 051138ea4d..f9d13fea57 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -113,6 +113,10 @@ static NSMutableDictionary *callFlags;
// the SendClose op is added.
BOOL _unaryCall;
NSMutableArray *_unaryOpBatch;
+
+ // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
+ // queue
+ dispatch_queue_t _responseQueue;
}
@synthesize state = _state;
@@ -175,10 +179,19 @@ static NSMutableDictionary *callFlags;
_unaryCall = YES;
_unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
}
+
+ _responseQueue = dispatch_get_main_queue();
}
return self;
}
+- (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ _responseQueue = queue;
+}
+
#pragma mark Finish
- (void)finishWithError:(NSError *)errorOrNil {
@@ -424,7 +437,8 @@ static NSMutableDictionary *callFlags;
// that the life of the instance is determined by this retain cycle.
_retainSelf = self;
- _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
+ _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable
+ dispatchQueue:_responseQueue];
_wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host path:_path];
NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?");
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
index b2775f98b5..07004f6d4d 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
@@ -53,7 +53,9 @@
* The GRXWriteable instance is retained until writesFinishedWithError: is sent to it, and released
* after that.
*/
-- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable NS_DESIGNATED_INITIALIZER;
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable
+ dispatchQueue:(dispatch_queue_t)queue NS_DESIGNATED_INITIALIZER;
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable;
/**
* Enqueues writeValue: to be sent to the writeable in the main thread.
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
index 08bd079aea..88aa7a7282 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
@@ -51,14 +51,20 @@
}
// Designated initializer
-- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable {
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable
+ dispatchQueue:(dispatch_queue_t)queue {
if (self = [super init]) {
- _writeableQueue = dispatch_get_main_queue();
+ _writeableQueue = queue;
_writeable = writeable;
}
return self;
}
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable {
+ return [self initWithWriteable:writeable
+ dispatchQueue:dispatch_get_main_queue()];
+}
+
- (void)enqueueValue:(id)value completionHandler:(void (^)())handler {
dispatch_async(_writeableQueue, ^{
// We're racing a possible cancellation performed by another thread. To turn all already-
diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m
index 76c15003f6..e36f5c3ee9 100644
--- a/src/objective-c/tests/GRPCClientTests.m
+++ b/src/objective-c/tests/GRPCClientTests.m
@@ -353,4 +353,59 @@ static GRPCProtoMethod *kUnaryCallMethod;
[self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
}
+- (void)testAlternateDispatchQueue {
+ const int32_t kPayloadSize = 100;
+ RMTSimpleRequest *request = [RMTSimpleRequest message];
+ request.responseSize = kPayloadSize;
+
+ __weak XCTestExpectation *expectation1 = [self expectationWithDescription:@"AlternateDispatchQueue1"];
+
+ // Use default (main) dispatch queue
+ NSString *main_queue_label = [NSString stringWithUTF8String:dispatch_queue_get_label(dispatch_get_main_queue())];
+
+ GRXWriter *requestsWriter1 = [GRXWriter writerWithValue:[request data]];
+
+ GRPCCall *call1 = [[GRPCCall alloc] initWithHost:kHostAddress
+ path:kUnaryCallMethod.HTTPPath
+ requestsWriter:requestsWriter1];
+
+ id<GRXWriteable> responsesWriteable1 = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
+ NSString *label = [NSString stringWithUTF8String:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)];
+ XCTAssert([label isEqualToString:main_queue_label]);
+
+ [expectation1 fulfill];
+ } completionHandler:^(NSError *errorOrNil) {
+ }];
+
+ [call1 startWithWriteable:responsesWriteable1];
+
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
+
+ // Use a custom queue
+ __weak XCTestExpectation *expectation2 = [self expectationWithDescription:@"AlternateDispatchQueue2"];
+
+ NSString *queue_label = @"test.queue1";
+ dispatch_queue_t queue = dispatch_queue_create([queue_label UTF8String], DISPATCH_QUEUE_SERIAL);
+
+ GRXWriter *requestsWriter2 = [GRXWriter writerWithValue:[request data]];
+
+ GRPCCall *call2 = [[GRPCCall alloc] initWithHost:kHostAddress
+ path:kUnaryCallMethod.HTTPPath
+ requestsWriter:requestsWriter2];
+
+ [call2 setResponseDispatchQueue:queue];
+
+ id<GRXWriteable> responsesWriteable2 = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) {
+ NSString *label = [NSString stringWithUTF8String:dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)];
+ XCTAssert([label isEqualToString:queue_label]);
+
+ [expectation2 fulfill];
+ } completionHandler:^(NSError *errorOrNil) {
+ }];
+
+ [call2 startWithWriteable:responsesWriteable2];
+
+ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
+}
+
@end
diff --git a/src/proto/grpc/status/BUILD b/src/proto/grpc/status/BUILD
index c17f87eb3d..71363bd1b6 100644
--- a/src/proto/grpc/status/BUILD
+++ b/src/proto/grpc/status/BUILD
@@ -37,7 +37,5 @@ grpc_proto_library(
name = "status_proto",
srcs = ["status.proto"],
has_services = False,
- well_known_protos = "@submodule_protobuf//:well_known_protos",
+ well_known_protos = "@com_google_protobuf//:well_known_protos",
)
-
-
diff --git a/src/python/grpcio_tests/tests/http2/negative_http2_client.py b/src/python/grpcio_tests/tests/http2/negative_http2_client.py
index b184e62cfd..90f54e80bb 100644
--- a/src/python/grpcio_tests/tests/http2/negative_http2_client.py
+++ b/src/python/grpcio_tests/tests/http2/negative_http2_client.py
@@ -96,8 +96,6 @@ def _rst_during_data(stub):
def _rst_after_data(stub):
resp_future = stub.UnaryCall.future(_SIMPLE_REQUEST)
- _validate_payload_type_and_length(
- next(resp_future), messages_pb2.COMPRESSABLE, _RESPONSE_SIZE)
_validate_status_code_and_details(resp_future, grpc.StatusCode.INTERNAL,
"Received RST_STREAM with error code 0")