aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_parser.c38
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.c24
-rw-r--r--src/core/lib/channel/channel_args.h14
-rw-r--r--src/core/lib/client_config/lb_policies/pick_first.c43
-rw-r--r--src/core/lib/client_config/lb_policies/round_robin.c59
-rw-r--r--src/core/lib/client_config/lb_policy_factory.c5
-rw-r--r--src/core/lib/client_config/lb_policy_factory.h15
-rw-r--r--src/core/lib/client_config/lb_policy_registry.c4
-rw-r--r--src/core/lib/client_config/lb_policy_registry.h3
-rw-r--r--src/core/lib/client_config/resolvers/dns_resolver.c23
-rw-r--r--src/core/lib/client_config/resolvers/sockaddr_resolver.c91
-rw-r--r--src/core/lib/client_config/resolvers/zookeeper_resolver.c26
-rw-r--r--src/core/lib/http/parser.c16
-rw-r--r--src/core/lib/json/json_reader.c7
-rw-r--r--src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs11
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs195
-rw-r--r--src/csharp/Grpc.IntegrationTesting/GenericService.cs71
-rw-r--r--src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj1
-rw-r--r--src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs12
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ServerRunners.cs65
-rw-r--r--src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h2
21 files changed, 527 insertions, 198 deletions
diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c
index 793e945331..259452fa42 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_parser.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c
@@ -52,6 +52,8 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/string.h"
+extern int grpc_http_trace;
+
typedef enum {
NOT_BINARY,
B64_BYTE0,
@@ -723,7 +725,9 @@ static int finish_indexed_field(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
const uint8_t *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
if (md == NULL) {
- gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index);
+ if (grpc_http_trace) {
+ gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index);
+ }
return 0;
}
GRPC_MDELEM_REF(md);
@@ -919,7 +923,9 @@ static int parse_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p,
/* finish parsing a max table size change */
static int finish_max_tbl_size(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
const uint8_t *end) {
- gpr_log(GPR_INFO, "MAX TABLE SIZE: %d", p->index);
+ if (grpc_http_trace) {
+ gpr_log(GPR_INFO, "MAX TABLE SIZE: %d", p->index);
+ }
return grpc_chttp2_hptbl_set_current_table_size(&p->table, p->index) &&
parse_begin(p, cur, end);
}
@@ -960,7 +966,9 @@ static int parse_error(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
static int parse_illegal_op(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
const uint8_t *end) {
GPR_ASSERT(cur != end);
- gpr_log(GPR_DEBUG, "Illegal hpack op code %d", *cur);
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "Illegal hpack op code %d", *cur);
+ }
return parse_error(p, cur, end);
}
@@ -1069,10 +1077,12 @@ static int parse_value4(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
}
error:
- gpr_log(GPR_ERROR,
- "integer overflow in hpack integer decoding: have 0x%08x, "
- "got byte 0x%02x on byte 5",
- *p->parsing.value, *cur);
+ if (grpc_http_trace) {
+ gpr_log(GPR_ERROR,
+ "integer overflow in hpack integer decoding: have 0x%08x, "
+ "got byte 0x%02x on byte 5",
+ *p->parsing.value, *cur);
+ }
return parse_error(p, cur, end);
}
@@ -1094,10 +1104,12 @@ static int parse_value5up(grpc_chttp2_hpack_parser *p, const uint8_t *cur,
return parse_next(p, cur + 1, end);
}
- gpr_log(GPR_ERROR,
- "integer overflow in hpack integer decoding: have 0x%08x, "
- "got byte 0x%02x sometime after byte 5",
- *p->parsing.value, *cur);
+ if (grpc_http_trace) {
+ gpr_log(GPR_ERROR,
+ "integer overflow in hpack integer decoding: have 0x%08x, "
+ "got byte 0x%02x sometime after byte 5",
+ *p->parsing.value, *cur);
+ }
return parse_error(p, cur, end);
}
@@ -1329,7 +1341,9 @@ static is_binary_header is_binary_literal_header(grpc_chttp2_hpack_parser *p) {
static is_binary_header is_binary_indexed_header(grpc_chttp2_hpack_parser *p) {
grpc_mdelem *elem = grpc_chttp2_hptbl_lookup(&p->table, p->index);
if (!elem) {
- gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index);
+ if (grpc_http_trace) {
+ gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index);
+ }
return ERROR_HEADER;
}
return grpc_is_binary_header(
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c
index d02f4ddc61..67cd1bb10a 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.c
@@ -41,6 +41,8 @@
#include "src/core/lib/support/murmur_hash.h"
+extern int grpc_http_trace;
+
static struct {
const char *key;
const char *value;
@@ -264,12 +266,16 @@ int grpc_chttp2_hptbl_set_current_table_size(grpc_chttp2_hptbl *tbl,
return 1;
}
if (bytes > tbl->max_bytes) {
- gpr_log(GPR_ERROR,
- "Attempt to make hpack table %d bytes when max is %d bytes", bytes,
- tbl->max_bytes);
+ if (grpc_http_trace) {
+ gpr_log(GPR_ERROR,
+ "Attempt to make hpack table %d bytes when max is %d bytes",
+ bytes, tbl->max_bytes);
+ }
return 0;
}
- gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes);
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "Update hpack parser table size to %d", bytes);
+ }
while (tbl->mem_used > bytes) {
evict1(tbl);
}
@@ -293,10 +299,12 @@ int grpc_chttp2_hptbl_add(grpc_chttp2_hptbl *tbl, grpc_mdelem *md) {
GRPC_CHTTP2_HPACK_ENTRY_OVERHEAD;
if (tbl->current_table_bytes > tbl->max_bytes) {
- gpr_log(GPR_ERROR,
- "HPACK max table size reduced to %d but not reflected by hpack "
- "stream (still at %d)",
- tbl->max_bytes, tbl->current_table_bytes);
+ if (grpc_http_trace) {
+ gpr_log(GPR_ERROR,
+ "HPACK max table size reduced to %d but not reflected by hpack "
+ "stream (still at %d)",
+ tbl->max_bytes, tbl->current_table_bytes);
+ }
return 0;
}
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 67d287ec6b..1ea202c543 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -37,23 +37,23 @@
#include <grpc/compression.h>
#include <grpc/grpc.h>
-/* Copy some arguments */
+/** Copy the arguments in \a src into a new instance */
grpc_channel_args *grpc_channel_args_copy(const grpc_channel_args *src);
-/* Copy some arguments, stably sorting keys */
-grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *a);
+/** Copy the arguments in \a src into a new instance, stably sorting keys */
+grpc_channel_args *grpc_channel_args_normalize(const grpc_channel_args *src);
-/** Copy some arguments and add the to_add parameter in the end.
- If to_add is NULL, it is equivalent to call grpc_channel_args_copy. */
+/** Copy the arguments in \a src and append \a to_add. If \a to_add is NULL, it
+ * is equivalent to calling \a grpc_channel_args_copy. */
grpc_channel_args *grpc_channel_args_copy_and_add(const grpc_channel_args *src,
const grpc_arg *to_add,
size_t num_to_add);
-/** Copy args from a then args from b into a new channel args */
+/** Concatenate args from \a a and \a b into a new instance */
grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
const grpc_channel_args *b);
-/** Destroy arguments created by grpc_channel_args_copy */
+/** Destroy arguments created by \a grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
diff --git a/src/core/lib/client_config/lb_policies/pick_first.c b/src/core/lib/client_config/lb_policies/pick_first.c
index 2e399b73f9..9ac550d9e7 100644
--- a/src/core/lib/client_config/lb_policies/pick_first.c
+++ b/src/core/lib/client_config/lb_policies/pick_first.c
@@ -32,11 +32,11 @@
*/
#include "src/core/lib/client_config/lb_policies/pick_first.h"
-#include "src/core/lib/client_config/lb_policy_factory.h"
#include <string.h>
#include <grpc/support/alloc.h>
+#include "src/core/lib/client_config/lb_policy_factory.h"
#include "src/core/lib/transport/connectivity_state.h"
typedef struct pending_pick {
@@ -391,19 +391,42 @@ static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
-static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
+static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
- if (args->num_subchannels == 0) return NULL;
+ GPR_ASSERT(args->addresses != NULL);
+ GPR_ASSERT(args->subchannel_factory != NULL);
+
+ if (args->addresses->naddrs == 0) return NULL;
+
pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
memset(p, 0, sizeof(*p));
- grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
+
p->subchannels =
- gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
- p->num_subchannels = args->num_subchannels;
- grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
- "pick_first");
- memcpy(p->subchannels, args->subchannels,
- sizeof(grpc_subchannel *) * args->num_subchannels);
+ gpr_malloc(sizeof(grpc_subchannel *) * args->addresses->naddrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+ grpc_subchannel_args sc_args;
+ size_t subchannel_idx = 0;
+ for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
+ sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+
+ grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
+ exec_ctx, args->subchannel_factory, &sc_args);
+
+ if (subchannel != NULL) {
+ p->subchannels[subchannel_idx++] = subchannel;
+ }
+ }
+ if (subchannel_idx == 0) {
+ gpr_free(p->subchannels);
+ gpr_free(p);
+ return NULL;
+ }
+ p->num_subchannels = subchannel_idx;
+
+ grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
gpr_mu_init(&p->mu);
return &p->base;
diff --git a/src/core/lib/client_config/lb_policies/round_robin.c b/src/core/lib/client_config/lb_policies/round_robin.c
index c904c5f921..a4bc2c4786 100644
--- a/src/core/lib/client_config/lb_policies/round_robin.c
+++ b/src/core/lib/client_config/lb_policies/round_robin.c
@@ -496,30 +496,47 @@ static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
-static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
+static grpc_lb_policy *create_round_robin(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args) {
- size_t i;
+ GPR_ASSERT(args->addresses != NULL);
+ GPR_ASSERT(args->subchannel_factory != NULL);
+
round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
- GPR_ASSERT(args->num_subchannels > 0);
memset(p, 0, sizeof(*p));
- grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
- p->num_subchannels = args->num_subchannels;
- p->subchannels = gpr_malloc(sizeof(*p->subchannels) * p->num_subchannels);
- memset(p->subchannels, 0, sizeof(*p->subchannels) * p->num_subchannels);
- grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
- "round_robin");
- gpr_mu_init(&p->mu);
- for (i = 0; i < args->num_subchannels; i++) {
- subchannel_data *sd = gpr_malloc(sizeof(*sd));
- memset(sd, 0, sizeof(*sd));
- p->subchannels[i] = sd;
- sd->policy = p;
- sd->index = i;
- sd->subchannel = args->subchannels[i];
- grpc_closure_init(&sd->connectivity_changed_closure,
- rr_connectivity_changed, sd);
+ p->subchannels =
+ gpr_malloc(sizeof(*p->subchannels) * args->addresses->naddrs);
+ memset(p->subchannels, 0, sizeof(*p->subchannels) * args->addresses->naddrs);
+
+ grpc_subchannel_args sc_args;
+ size_t subchannel_idx = 0;
+ for (size_t i = 0; i < args->addresses->naddrs; i++) {
+ memset(&sc_args, 0, sizeof(grpc_subchannel_args));
+ sc_args.addr = (struct sockaddr *)(args->addresses->addrs[i].addr);
+ sc_args.addr_len = (size_t)args->addresses->addrs[i].len;
+
+ grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
+ exec_ctx, args->subchannel_factory, &sc_args);
+
+ if (subchannel != NULL) {
+ subchannel_data *sd = gpr_malloc(sizeof(*sd));
+ memset(sd, 0, sizeof(*sd));
+ p->subchannels[subchannel_idx] = sd;
+ sd->policy = p;
+ sd->index = subchannel_idx;
+ sd->subchannel = subchannel;
+ ++subchannel_idx;
+ grpc_closure_init(&sd->connectivity_changed_closure,
+ rr_connectivity_changed, sd);
+ }
+ }
+ if (subchannel_idx == 0) {
+ gpr_free(p->subchannels);
+ gpr_free(p);
+ return NULL;
}
+ p->num_subchannels = subchannel_idx;
/* The (dummy node) root of the ready list */
p->ready_list.subchannel = NULL;
@@ -527,6 +544,10 @@ static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
p->ready_list.next = NULL;
p->ready_list_last_pick = &p->ready_list;
+ grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
+ grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
+ "round_robin");
+ gpr_mu_init(&p->mu);
return &p->base;
}
diff --git a/src/core/lib/client_config/lb_policy_factory.c b/src/core/lib/client_config/lb_policy_factory.c
index 2ca6f42f89..ede1d624af 100644
--- a/src/core/lib/client_config/lb_policy_factory.c
+++ b/src/core/lib/client_config/lb_policy_factory.c
@@ -42,7 +42,8 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory* factory) {
}
grpc_lb_policy* grpc_lb_policy_factory_create_lb_policy(
- grpc_lb_policy_factory* factory, grpc_lb_policy_args* args) {
+ grpc_exec_ctx* exec_ctx, grpc_lb_policy_factory* factory,
+ grpc_lb_policy_args* args) {
if (factory == NULL) return NULL;
- return factory->vtable->create_lb_policy(factory, args);
+ return factory->vtable->create_lb_policy(exec_ctx, factory, args);
}
diff --git a/src/core/lib/client_config/lb_policy_factory.h b/src/core/lib/client_config/lb_policy_factory.h
index 36eaf178d9..9a93a8ca3f 100644
--- a/src/core/lib/client_config/lb_policy_factory.h
+++ b/src/core/lib/client_config/lb_policy_factory.h
@@ -35,7 +35,10 @@
#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H
#include "src/core/lib/client_config/lb_policy.h"
-#include "src/core/lib/client_config/subchannel.h"
+#include "src/core/lib/client_config/subchannel_factory.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+
+#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
@@ -47,8 +50,8 @@ struct grpc_lb_policy_factory {
};
typedef struct grpc_lb_policy_args {
- grpc_subchannel **subchannels;
- size_t num_subchannels;
+ grpc_resolved_addresses *addresses;
+ grpc_subchannel_factory *subchannel_factory;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
@@ -56,7 +59,8 @@ struct grpc_lb_policy_factory_vtable {
void (*unref)(grpc_lb_policy_factory *factory);
/** Implementation of grpc_lb_policy_factory_create_lb_policy */
- grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory,
+ grpc_lb_policy *(*create_lb_policy)(grpc_exec_ctx *exec_ctx,
+ grpc_lb_policy_factory *factory,
grpc_lb_policy_args *args);
/** Name for the LB policy this factory implements */
@@ -68,6 +72,7 @@ void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory);
/** Create a lb_policy instance. */
grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy(
- grpc_lb_policy_factory *factory, grpc_lb_policy_args *args);
+ grpc_exec_ctx *exec_ctx, grpc_lb_policy_factory *factory,
+ grpc_lb_policy_args *args);
#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_FACTORY_H */
diff --git a/src/core/lib/client_config/lb_policy_registry.c b/src/core/lib/client_config/lb_policy_registry.c
index 13acfe78cd..f703e630a0 100644
--- a/src/core/lib/client_config/lb_policy_registry.c
+++ b/src/core/lib/client_config/lb_policy_registry.c
@@ -79,10 +79,10 @@ static grpc_lb_policy_factory *lookup_factory(const char *name) {
return NULL;
}
-grpc_lb_policy *grpc_lb_policy_create(const char *name,
+grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name,
grpc_lb_policy_args *args) {
grpc_lb_policy_factory *factory = lookup_factory(name);
grpc_lb_policy *lb_policy =
- grpc_lb_policy_factory_create_lb_policy(factory, args);
+ grpc_lb_policy_factory_create_lb_policy(exec_ctx, factory, args);
return lb_policy;
}
diff --git a/src/core/lib/client_config/lb_policy_registry.h b/src/core/lib/client_config/lb_policy_registry.h
index c251fd9f08..789854bd20 100644
--- a/src/core/lib/client_config/lb_policy_registry.h
+++ b/src/core/lib/client_config/lb_policy_registry.h
@@ -35,6 +35,7 @@
#define GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H
#include "src/core/lib/client_config/lb_policy_factory.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
/** Initialize the registry and set \a default_factory as the factory to be
* returned when no name is provided in a lookup */
@@ -48,7 +49,7 @@ void grpc_register_lb_policy(grpc_lb_policy_factory *factory);
*
* If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init
* will be returned. */
-grpc_lb_policy *grpc_lb_policy_create(const char *name,
+grpc_lb_policy *grpc_lb_policy_create(grpc_exec_ctx *exec_ctx, const char *name,
grpc_lb_policy_args *args);
#endif /* GRPC_CORE_LIB_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */
diff --git a/src/core/lib/client_config/resolvers/dns_resolver.c b/src/core/lib/client_config/resolvers/dns_resolver.c
index ab445730ad..62bccdd045 100644
--- a/src/core/lib/client_config/resolvers/dns_resolver.c
+++ b/src/core/lib/client_config/resolvers/dns_resolver.c
@@ -162,38 +162,23 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolved_addresses *addresses) {
dns_resolver *r = arg;
grpc_client_config *config = NULL;
- grpc_subchannel **subchannels;
- grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
- size_t i;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving);
r->resolving = 0;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
- subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
- size_t naddrs = 0;
- for (i = 0; i < addresses->naddrs; i++) {
- memset(&args, 0, sizeof(args));
- args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
- args.addr_len = (size_t)addresses->addrs[i].len;
- grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel(
- exec_ctx, r->subchannel_factory, &args);
- if (subchannel != NULL) {
- subchannels[naddrs++] = subchannel;
- }
- }
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.subchannels = subchannels;
- lb_policy_args.num_subchannels = naddrs;
- lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
+ lb_policy_args.addresses = addresses;
+ lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
if (lb_policy != NULL) {
grpc_client_config_set_lb_policy(config, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
}
grpc_resolved_addresses_destroy(addresses);
- gpr_free(subchannels);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now);
diff --git a/src/core/lib/client_config/resolvers/sockaddr_resolver.c b/src/core/lib/client_config/resolvers/sockaddr_resolver.c
index 66cddc3ed9..5c5133649a 100644
--- a/src/core/lib/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/lib/client_config/resolvers/sockaddr_resolver.c
@@ -58,11 +58,7 @@ typedef struct {
char *lb_policy_name;
/** the addresses that we've 'resolved' */
- struct sockaddr_storage *addrs;
- /** the corresponding length of the addresses */
- size_t *addrs_len;
- /** how many elements in \a addrs */
- size_t num_addrs;
+ grpc_resolved_addresses *addresses;
/** mutex guarding the rest of the state */
gpr_mu mu;
@@ -125,28 +121,14 @@ static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver,
static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
sockaddr_resolver *r) {
- grpc_client_config *cfg;
- grpc_lb_policy *lb_policy;
- grpc_lb_policy_args lb_policy_args;
- grpc_subchannel **subchannels;
- grpc_subchannel_args args;
-
if (r->next_completion != NULL && !r->published) {
- size_t i;
- cfg = grpc_client_config_create();
- subchannels = gpr_malloc(sizeof(grpc_subchannel *) * r->num_addrs);
- for (i = 0; i < r->num_addrs; i++) {
- memset(&args, 0, sizeof(args));
- args.addr = (struct sockaddr *)&r->addrs[i];
- args.addr_len = r->addrs_len[i];
- subchannels[i] = grpc_subchannel_factory_create_subchannel(
- exec_ctx, r->subchannel_factory, &args);
- }
+ grpc_client_config *cfg = grpc_client_config_create();
+ grpc_lb_policy_args lb_policy_args;
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
- lb_policy_args.subchannels = subchannels;
- lb_policy_args.num_subchannels = r->num_addrs;
- lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
- gpr_free(subchannels);
+ lb_policy_args.addresses = r->addresses;
+ lb_policy_args.subchannel_factory = r->subchannel_factory;
+ grpc_lb_policy *lb_policy =
+ grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(cfg, lb_policy);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;
@@ -160,8 +142,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(exec_ctx, r->subchannel_factory);
- gpr_free(r->addrs);
- gpr_free(r->addrs_len);
+ grpc_resolved_addresses_destroy(r->addresses);
gpr_free(r->lb_policy_name);
gpr_free(r);
}
@@ -269,7 +250,6 @@ static void do_nothing(void *ignored) {}
static grpc_resolver *sockaddr_create(
grpc_resolver_args *args, const char *default_lb_policy_name,
int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) {
- size_t i;
int errors_found = 0; /* GPR_FALSE */
sockaddr_resolver *r;
gpr_slice path_slice;
@@ -287,17 +267,46 @@ static grpc_resolver *sockaddr_create(
r->lb_policy_name = NULL;
if (0 != strcmp(args->uri->query, "")) {
gpr_slice query_slice;
- gpr_slice_buffer query_parts;
+ gpr_slice_buffer query_parts; /* the &-separated elements of the query */
+ gpr_slice_buffer query_param_parts; /* the =-separated subelements */
query_slice =
gpr_slice_new(args->uri->query, strlen(args->uri->query), do_nothing);
gpr_slice_buffer_init(&query_parts);
- gpr_slice_split(query_slice, "=", &query_parts);
- GPR_ASSERT(query_parts.count == 2);
- if (0 == gpr_slice_str_cmp(query_parts.slices[0], "lb_policy")) {
- r->lb_policy_name = gpr_dump_slice(query_parts.slices[1], GPR_DUMP_ASCII);
+ gpr_slice_buffer_init(&query_param_parts);
+ /* the query can contain "lb_policy=<policy>" and "lb_enabled=<1|0>" */
+
+ bool lb_enabled;
+ gpr_slice_split(query_slice, "&", &query_parts);
+ for (i = 0; i < query_parts.count; i++) {
+ gpr_slice_split(query_parts.slices[i], "=", &query_param_parts);
+ GPR_ASSERT(query_param_parts.count == 2);
+ if (0 == gpr_slice_str_cmp(query_param_parts.slices[0], "lb_policy")) {
+ r->lb_policy_name =
+ gpr_dump_slice(query_param_parts.slices[1], GPR_DUMP_ASCII);
+ } else if (0 ==
+ gpr_slice_str_cmp(query_param_parts.slices[0], "lb_enabled")) {
+ if (0 != gpr_slice_str_cmp(query_param_parts.slices[1], "0")) {
+ /* anything other than 0 is taken to be true */
+ lb_enabled = true;
+ }
+ } else {
+ gpr_log(GPR_ERROR, "invalid query element value: '%s'",
+ query_parts.slices[0]);
+ }
+ gpr_slice_buffer_reset_and_unref(&query_param_parts);
+ }
+
+ if (strcmp("grpclb", r->lb_policy_name) == 0 && !lb_enabled) {
+ /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail
+ * out, as this is meant mostly for tests. */
+ gpr_log(GPR_ERROR,
+ "Requested 'grpclb' LB policy but resolved addresses don't "
+ "support load balancing.");
+ abort();
}
gpr_slice_buffer_destroy(&query_parts);
+ gpr_slice_buffer_destroy(&query_param_parts);
gpr_slice_unref(query_slice);
}
if (r->lb_policy_name == NULL) {
@@ -309,15 +318,18 @@ static grpc_resolver *sockaddr_create(
gpr_slice_buffer_init(&path_parts);
gpr_slice_split(path_slice, ",", &path_parts);
- r->num_addrs = path_parts.count;
- r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs);
- r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs);
+ r->addresses = gpr_malloc(sizeof(grpc_resolved_addresses));
+ r->addresses->naddrs = path_parts.count;
+ r->addresses->addrs =
+ gpr_malloc(sizeof(grpc_resolved_address) * r->addresses->naddrs);
- for (i = 0; i < r->num_addrs; i++) {
+ for (size_t i = 0; i < r->addresses->naddrs; i++) {
grpc_uri ith_uri = *args->uri;
char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII);
ith_uri.path = part_str;
- if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) {
+ if (!parse(&ith_uri,
+ (struct sockaddr_storage *)(&r->addresses->addrs[i].addr),
+ &r->addresses->addrs[i].len)) {
errors_found = 1; /* GPR_TRUE */
}
gpr_free(part_str);
@@ -328,8 +340,7 @@ static grpc_resolver *sockaddr_create(
gpr_slice_unref(path_slice);
if (errors_found) {
gpr_free(r->lb_policy_name);
- gpr_free(r->addrs);
- gpr_free(r->addrs_len);
+ grpc_resolved_addresses_destroy(r->addresses);
gpr_free(r);
return NULL;
}
diff --git a/src/core/lib/client_config/resolvers/zookeeper_resolver.c b/src/core/lib/client_config/resolvers/zookeeper_resolver.c
index 3bb0bbdf5c..404dfcd423 100644
--- a/src/core/lib/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/lib/client_config/resolvers/zookeeper_resolver.c
@@ -184,28 +184,22 @@ static void zookeeper_on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
grpc_resolved_addresses *addresses) {
zookeeper_resolver *r = arg;
grpc_client_config *config = NULL;
- grpc_subchannel **subchannels;
- grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
- size_t i;
+
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
config = grpc_client_config_create();
- subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs);
- for (i = 0; i < addresses->naddrs; i++) {
- memset(&args, 0, sizeof(args));
- args.addr = (struct sockaddr *)(addresses->addrs[i].addr);
- args.addr_len = addresses->addrs[i].len;
- subchannels[i] = grpc_subchannel_factory_create_subchannel(
- exec_ctx, r->subchannel_factory, &args);
+
+ lb_policy_args.addresses = addresses;
+ lb_policy_args.subchannel_factory = r->subchannel_factory;
+ lb_policy =
+ grpc_lb_policy_create(exec_ctx, r->lb_policy_name, &lb_policy_args);
+
+ if (lb_policy != NULL) {
+ grpc_client_config_set_lb_policy(config, lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
}
- lb_policy_args.subchannels = subchannels;
- lb_policy_args.num_subchannels = addresses->naddrs;
- lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
- grpc_client_config_set_lb_policy(config, lb_policy);
- GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction");
grpc_resolved_addresses_destroy(addresses);
- gpr_free(subchannels);
}
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->resolving == 1);
diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c
index 5d4e304615..2782ad758e 100644
--- a/src/core/lib/http/parser.c
+++ b/src/core/lib/http/parser.c
@@ -39,6 +39,8 @@
#include <grpc/support/log.h>
#include <grpc/support/useful.h>
+extern int grpc_http_trace;
+
static char *buf2str(void *buffer, size_t length) {
char *out = gpr_malloc(length + 1);
memcpy(out, buffer, length);
@@ -72,7 +74,7 @@ static int handle_response_line(grpc_http_parser *parser) {
return 1;
error:
- gpr_log(GPR_ERROR, "Failed parsing response line");
+ if (grpc_http_trace) gpr_log(GPR_ERROR, "Failed parsing response line");
return 0;
}
@@ -125,7 +127,7 @@ static int handle_request_line(grpc_http_parser *parser) {
return 1;
error:
- gpr_log(GPR_ERROR, "Failed parsing request line");
+ if (grpc_http_trace) gpr_log(GPR_ERROR, "Failed parsing request line");
return 0;
}
@@ -150,7 +152,8 @@ static int add_header(grpc_http_parser *parser) {
GPR_ASSERT(cur != end);
if (*cur == ' ' || *cur == '\t') {
- gpr_log(GPR_ERROR, "Continued header lines not supported yet");
+ if (grpc_http_trace)
+ gpr_log(GPR_ERROR, "Continued header lines not supported yet");
goto error;
}
@@ -158,7 +161,7 @@ static int add_header(grpc_http_parser *parser) {
cur++;
}
if (cur == end) {
- gpr_log(GPR_ERROR, "Didn't find ':' in header string");
+ if (grpc_http_trace) gpr_log(GPR_ERROR, "Didn't find ':' in header string");
goto error;
}
GPR_ASSERT(cur >= beg);
@@ -249,8 +252,9 @@ static int addbyte(grpc_http_parser *parser, uint8_t byte) {
case GRPC_HTTP_FIRST_LINE:
case GRPC_HTTP_HEADERS:
if (parser->cur_line_length >= GRPC_HTTP_PARSER_MAX_HEADER_LENGTH) {
- gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded",
- GRPC_HTTP_PARSER_MAX_HEADER_LENGTH);
+ if (grpc_http_trace)
+ gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded",
+ GRPC_HTTP_PARSER_MAX_HEADER_LENGTH);
return 0;
}
parser->cur_line[parser->cur_line_length] = byte;
diff --git a/src/core/lib/json/json_reader.c b/src/core/lib/json/json_reader.c
index 0807f029ce..4cff13dff1 100644
--- a/src/core/lib/json/json_reader.c
+++ b/src/core/lib/json/json_reader.c
@@ -280,13 +280,14 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader *reader) {
break;
case GRPC_JSON_STATE_OBJECT_KEY_STRING:
- GPR_ASSERT(reader->unicode_high_surrogate == 0);
+ if (reader->unicode_high_surrogate != 0)
+ return GRPC_JSON_PARSE_ERROR;
if (c == '"') {
reader->state = GRPC_JSON_STATE_OBJECT_KEY_END;
json_reader_set_key(reader);
json_reader_string_clear(reader);
} else {
- if (c <= 0x001f) return GRPC_JSON_PARSE_ERROR;
+ if (c < 32) return GRPC_JSON_PARSE_ERROR;
json_reader_string_add_char(reader, c);
}
break;
@@ -362,6 +363,8 @@ grpc_json_reader_status grpc_json_reader_run(grpc_json_reader *reader) {
reader->in_object = 0;
reader->in_array = 1;
break;
+ default:
+ return GRPC_JSON_PARSE_ERROR;
}
break;
diff --git a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs
index 47a15224f1..1edeedae2f 100644
--- a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs
+++ b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceImpl.cs
@@ -1,6 +1,6 @@
#region Copyright notice and license
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -46,16 +46,13 @@ namespace Grpc.Testing
/// </summary>
public class BenchmarkServiceImpl : BenchmarkService.IBenchmarkService
{
- private readonly int responseSize;
-
- public BenchmarkServiceImpl(int responseSize)
+ public BenchmarkServiceImpl()
{
- this.responseSize = responseSize;
}
public Task<SimpleResponse> UnaryCall(SimpleRequest request, ServerCallContext context)
{
- var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) };
+ var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) };
return Task.FromResult(response);
}
@@ -63,7 +60,7 @@ namespace Grpc.Testing
{
await requestStream.ForEachAsync(async request =>
{
- var response = new SimpleResponse { Payload = CreateZerosPayload(responseSize) };
+ var response = new SimpleResponse { Payload = CreateZerosPayload(request.ResponseSize) };
await responseStream.WriteAsync(response);
});
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index c4016012cb..e6dc2321c4 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -41,6 +41,7 @@ using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
+using Grpc.Core.Logging;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
@@ -50,42 +51,65 @@ namespace Grpc.IntegrationTesting
/// <summary>
/// Helper methods to start client runners for performance testing.
/// </summary>
- public static class ClientRunners
+ public class ClientRunners
{
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>();
+
/// <summary>
/// Creates a started client runner.
/// </summary>
public static IClientRunner CreateStarted(ClientConfig config)
{
+ Logger.Debug("ClientConfig: {0}", config);
string target = config.ServerTargets.Single();
- GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop);
+ GrpcPreconditions.CheckArgument(config.LoadParams.LoadCase == LoadParams.LoadOneofCase.ClosedLoop,
+ "Only closed loop scenario supported for C#");
+ GrpcPreconditions.CheckArgument(config.ClientChannels == 1, "ClientConfig.ClientChannels needs to be 1");
- var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
- var channel = new Channel(target, credentials);
+ if (config.OutstandingRpcsPerChannel != 0)
+ {
+ Logger.Warning("ClientConfig.OutstandingRpcsPerChannel is not supported for C#. Ignoring the value");
+ }
+ if (config.AsyncClientThreads != 0)
+ {
+ Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
+ }
+ if (config.CoreLimit != 0)
+ {
+ Logger.Warning("ClientConfig.CoreLimit is not supported for C#. Ignoring the value");
+ }
+ if (config.CoreList.Count > 0)
+ {
+ Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
+ }
- switch (config.RpcType)
+ var credentials = config.SecurityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
+ List<ChannelOption> channelOptions = null;
+ if (config.SecurityParams != null && config.SecurityParams.ServerHostOverride != "")
{
- case RpcType.UNARY:
- return new SyncUnaryClientRunner(channel,
- config.PayloadConfig.SimpleParams.ReqSize,
- config.HistogramParams);
-
- case RpcType.STREAMING:
- default:
- throw new ArgumentException("Unsupported RpcType.");
+ channelOptions = new List<ChannelOption>
+ {
+ new ChannelOption(ChannelOptions.SslTargetNameOverride, config.SecurityParams.ServerHostOverride)
+ };
}
+ var channel = new Channel(target, credentials, channelOptions);
+
+ return new ClientRunnerImpl(channel,
+ config.ClientType,
+ config.RpcType,
+ config.PayloadConfig,
+ config.HistogramParams);
}
}
- /// <summary>
- /// Client that starts synchronous unary calls in a closed loop.
- /// </summary>
- public class SyncUnaryClientRunner : IClientRunner
+ public class ClientRunnerImpl : IClientRunner
{
const double SecondsToNanos = 1e9;
readonly Channel channel;
- readonly int payloadSize;
+ readonly ClientType clientType;
+ readonly RpcType rpcType;
+ readonly PayloadConfig payloadConfig;
readonly Histogram histogram;
readonly BenchmarkService.IBenchmarkServiceClient client;
@@ -93,15 +117,19 @@ namespace Grpc.IntegrationTesting
readonly CancellationTokenSource stoppedCts;
readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
- public SyncUnaryClientRunner(Channel channel, int payloadSize, HistogramParams histogramParams)
+ public ClientRunnerImpl(Channel channel, ClientType clientType, RpcType rpcType, PayloadConfig payloadConfig, HistogramParams histogramParams)
{
this.channel = GrpcPreconditions.CheckNotNull(channel);
- this.payloadSize = payloadSize;
+ this.clientType = clientType;
+ this.rpcType = rpcType;
+ this.payloadConfig = payloadConfig;
this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
this.stoppedCts = new CancellationTokenSource();
this.client = BenchmarkService.NewClient(channel);
- this.runnerTask = Task.Factory.StartNew(Run, TaskCreationOptions.LongRunning);
+
+ var threadBody = GetThreadBody();
+ this.runnerTask = Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning);
}
public ClientStats GetStats(bool reset)
@@ -126,12 +154,9 @@ namespace Grpc.IntegrationTesting
await channel.ShutdownAsync();
}
- private void Run()
+ private void RunClosedLoopUnary()
{
- var request = new SimpleRequest
- {
- Payload = CreateZerosPayload(payloadSize)
- };
+ var request = CreateSimpleRequest();
var stopwatch = new Stopwatch();
while (!stoppedCts.Token.IsCancellationRequested)
@@ -145,6 +170,124 @@ namespace Grpc.IntegrationTesting
}
}
+ private async Task RunClosedLoopUnaryAsync()
+ {
+ var request = CreateSimpleRequest();
+ var stopwatch = new Stopwatch();
+
+ while (!stoppedCts.Token.IsCancellationRequested)
+ {
+ stopwatch.Restart();
+ await client.UnaryCallAsync(request);
+ stopwatch.Stop();
+
+ // spec requires data point in nanoseconds.
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+ }
+
+ private async Task RunClosedLoopStreamingAsync()
+ {
+ var request = CreateSimpleRequest();
+ var stopwatch = new Stopwatch();
+
+ using (var call = client.StreamingCall())
+ {
+ while (!stoppedCts.Token.IsCancellationRequested)
+ {
+ stopwatch.Restart();
+ await call.RequestStream.WriteAsync(request);
+ await call.ResponseStream.MoveNext();
+ stopwatch.Stop();
+
+ // spec requires data point in nanoseconds.
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+
+ // finish the streaming call
+ await call.RequestStream.CompleteAsync();
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ }
+ }
+
+ private async Task RunGenericClosedLoopStreamingAsync()
+ {
+ var request = CreateByteBufferRequest();
+ var stopwatch = new Stopwatch();
+
+ var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
+
+ using (var call = Calls.AsyncDuplexStreamingCall(callDetails))
+ {
+ while (!stoppedCts.Token.IsCancellationRequested)
+ {
+ stopwatch.Restart();
+ await call.RequestStream.WriteAsync(request);
+ await call.ResponseStream.MoveNext();
+ stopwatch.Stop();
+
+ // spec requires data point in nanoseconds.
+ histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
+ }
+
+ // finish the streaming call
+ await call.RequestStream.CompleteAsync();
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ }
+ }
+
+ private Action GetThreadBody()
+ {
+ if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
+ {
+ GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API");
+ GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
+ return () =>
+ {
+ RunGenericClosedLoopStreamingAsync().Wait();
+ };
+ }
+
+ GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
+ if (clientType == ClientType.SYNC_CLIENT)
+ {
+ GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
+ return RunClosedLoopUnary;
+ }
+ else if (clientType == ClientType.ASYNC_CLIENT)
+ {
+ switch (rpcType)
+ {
+ case RpcType.UNARY:
+ return () =>
+ {
+ RunClosedLoopUnaryAsync().Wait();
+ };
+ case RpcType.STREAMING:
+ return () =>
+ {
+ RunClosedLoopStreamingAsync().Wait();
+ };
+ }
+ }
+ throw new ArgumentException("Unsupported configuration.");
+ }
+
+ private SimpleRequest CreateSimpleRequest()
+ {
+ GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
+ return new SimpleRequest
+ {
+ Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize),
+ ResponseSize = payloadConfig.SimpleParams.RespSize
+ };
+ }
+
+ private byte[] CreateByteBufferRequest()
+ {
+ return new byte[payloadConfig.BytebufParams.ReqSize];
+ }
+
private static Payload CreateZerosPayload(int size)
{
return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
diff --git a/src/csharp/Grpc.IntegrationTesting/GenericService.cs b/src/csharp/Grpc.IntegrationTesting/GenericService.cs
new file mode 100644
index 0000000000..c6128264ac
--- /dev/null
+++ b/src/csharp/Grpc.IntegrationTesting/GenericService.cs
@@ -0,0 +1,71 @@
+#region Copyright notice and license
+
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Text.RegularExpressions;
+using System.Threading.Tasks;
+using Google.Protobuf;
+using Grpc.Core;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+using Grpc.Testing;
+
+namespace Grpc.IntegrationTesting
+{
+ /// <summary>
+ /// Utility methods for defining and calling a service that doesn't use protobufs
+ /// for serialization/deserialization.
+ /// </summary>
+ public static class GenericService
+ {
+ readonly static Marshaller<byte[]> ByteArrayMarshaller = new Marshaller<byte[]>((b) => b, (b) => b);
+
+ public readonly static Method<byte[], byte[]> StreamingCallMethod = new Method<byte[], byte[]>(
+ MethodType.DuplexStreaming,
+ "grpc.testing.BenchmarkService",
+ "StreamingCall",
+ ByteArrayMarshaller,
+ ByteArrayMarshaller
+ );
+
+ public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod<byte[], byte[]> handler)
+ {
+ return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName)
+ .AddMethod(StreamingCallMethod, handler).Build();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 372991374e..4c049944ea 100644
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -120,6 +120,7 @@
<Compile Include="WorkerServiceImpl.cs" />
<Compile Include="QpsWorker.cs" />
<Compile Include="WallClockStopwatch.cs" />
+ <Compile Include="GenericService.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs
index 06d5ee93d8..a8cf75bd81 100644
--- a/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs
+++ b/src/csharp/Grpc.IntegrationTesting/RunnerClientServerTest.cs
@@ -55,14 +55,7 @@ namespace Grpc.IntegrationTesting
{
var serverConfig = new ServerConfig
{
- ServerType = ServerType.ASYNC_SERVER,
- PayloadConfig = new PayloadConfig
- {
- SimpleParams = new SimpleProtoParams
- {
- RespSize = 100
- }
- }
+ ServerType = ServerType.ASYNC_SERVER
};
serverRunner = ServerRunners.CreateStarted(serverConfig);
}
@@ -88,7 +81,8 @@ namespace Grpc.IntegrationTesting
{
SimpleParams = new SimpleProtoParams
{
- ReqSize = 100
+ ReqSize = 100,
+ RespSize = 100
}
},
HistogramParams = new HistogramParams
diff --git a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
index 4a73645e6c..c326378cfa 100644
--- a/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ServerRunners.cs
@@ -41,6 +41,7 @@ using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
+using Grpc.Core.Logging;
using Grpc.Core.Utils;
using NUnit.Framework;
using Grpc.Testing;
@@ -50,27 +51,78 @@ namespace Grpc.IntegrationTesting
/// <summary>
/// Helper methods to start server runners for performance testing.
/// </summary>
- public static class ServerRunners
+ public class ServerRunners
{
+ static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ServerRunners>();
+
/// <summary>
/// Creates a started server runner.
/// </summary>
public static IServerRunner CreateStarted(ServerConfig config)
{
- GrpcPreconditions.CheckArgument(config.ServerType == ServerType.ASYNC_SERVER);
+ Logger.Debug("ServerConfig: {0}", config);
var credentials = config.SecurityParams != null ? TestCredentials.CreateSslServerCredentials() : ServerCredentials.Insecure;
- // TODO: qps_driver needs to setup payload properly...
- int responseSize = config.PayloadConfig != null ? config.PayloadConfig.SimpleParams.RespSize : 0;
+ if (config.AsyncServerThreads != 0)
+ {
+ Logger.Warning("ServerConfig.AsyncServerThreads is not supported for C#. Ignoring the value");
+ }
+ if (config.CoreLimit != 0)
+ {
+ Logger.Warning("ServerConfig.CoreLimit is not supported for C#. Ignoring the value");
+ }
+ if (config.CoreList.Count > 0)
+ {
+ Logger.Warning("ServerConfig.CoreList is not supported for C#. Ignoring the value");
+ }
+
+ ServerServiceDefinition service = null;
+ if (config.ServerType == ServerType.ASYNC_SERVER)
+ {
+ GrpcPreconditions.CheckArgument(config.PayloadConfig == null,
+ "ServerConfig.PayloadConfig shouldn't be set for BenchmarkService based server.");
+ service = BenchmarkService.BindService(new BenchmarkServiceImpl());
+ }
+ else if (config.ServerType == ServerType.ASYNC_GENERIC_SERVER)
+ {
+ var genericService = new GenericServiceImpl(config.PayloadConfig.BytebufParams.RespSize);
+ service = GenericService.BindHandler(genericService.StreamingCall);
+ }
+ else
+ {
+ throw new ArgumentException("Unsupported ServerType");
+ }
+
var server = new Server
{
- Services = { BenchmarkService.BindService(new BenchmarkServiceImpl(responseSize)) },
+ Services = { service },
Ports = { new ServerPort("[::]", config.Port, credentials) }
};
server.Start();
return new ServerRunnerImpl(server);
}
+
+ private class GenericServiceImpl
+ {
+ readonly byte[] response;
+
+ public GenericServiceImpl(int responseSize)
+ {
+ this.response = new byte[responseSize];
+ }
+
+ /// <summary>
+ /// Generic streaming call handler.
+ /// </summary>
+ public async Task StreamingCall(IAsyncStreamReader<byte[]> requestStream, IServerStreamWriter<byte[]> responseStream, ServerCallContext context)
+ {
+ await requestStream.ForEachAsync(async request =>
+ {
+ await responseStream.WriteAsync(response);
+ });
+ }
+ }
}
/// <summary>
@@ -119,6 +171,5 @@ namespace Grpc.IntegrationTesting
{
return server.ShutdownAsync();
}
- }
-
+ }
}
diff --git a/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h
index 02871d5d02..4b92504b55 100644
--- a/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h
+++ b/src/objective-c/GRPCClient/private/GRPCReachabilityFlagNames.xmacro.h
@@ -54,7 +54,9 @@
GRPC_XMACRO_ITEM.
#endif
+#if TARGET_OS_IPHONE
GRPC_XMACRO_ITEM(isCell, IsWWAN)
+#endif
GRPC_XMACRO_ITEM(reachable, Reachable)
GRPC_XMACRO_ITEM(transientConnection, TransientConnection)
GRPC_XMACRO_ITEM(connectionRequired, ConnectionRequired)