diff options
author | Sree Kuchibhotla <sreek@google.com> | 2016-04-26 11:30:52 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2016-04-26 11:30:52 -0700 |
commit | fd59e4e5f93b803487fafa5572be36ba50b76c9d (patch) | |
tree | f99c1e18db1bbd117c00fd90319db7007c558622 /src | |
parent | 3714e302c06a907b7af42a478beae3321b07c70a (diff) | |
parent | c3d869ef5853c4cfad57b7d3694d5260eeb7ce75 (diff) |
Merge branch 'master' into stress_test_misc
Diffstat (limited to 'src')
40 files changed, 1452 insertions, 165 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 93d54fdcfe..8a98a6bcbe 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -205,7 +205,11 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; - if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { + if (lb_policy != NULL) { + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, + NULL); + } else if (chand->resolver == NULL /* disconnected */) { + grpc_closure_list_fail_all(&chand->waiting_for_config_closures); grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, NULL); } @@ -293,6 +297,11 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; + if (!chand->started_resolving) { + grpc_closure_list_fail_all(&chand->waiting_for_config_closures); + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, + NULL); + } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, @@ -321,10 +330,10 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) { continue_picking_args *cpa = arg; - if (!success) { - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); - } else if (cpa->connected_subchannel == NULL) { + if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ + } else if (!success) { + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready)) { @@ -381,14 +390,19 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, &chand->incoming_configuration, &chand->on_config_changed); } - cpa = gpr_malloc(sizeof(*cpa)); - cpa->initial_metadata = initial_metadata; - cpa->initial_metadata_flags = initial_metadata_flags; - cpa->connected_subchannel = connected_subchannel; - cpa->on_ready = on_ready; - cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa); - grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1); + if (chand->resolver != NULL) { + cpa = gpr_malloc(sizeof(*cpa)); + cpa->initial_metadata = initial_metadata; + cpa->initial_metadata_flags = initial_metadata_flags; + cpa->connected_subchannel = connected_subchannel; + cpa->on_ready = on_ready; + cpa->elem = elem; + grpc_closure_init(&cpa->closure, continue_picking, cpa); + grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, + 1); + } else { + grpc_exec_ctx_enqueue(exec_ctx, on_ready, false, NULL); + } gpr_mu_unlock(&chand->mu_config); return 0; } diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 125a291f21..c925c28c67 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -135,8 +135,6 @@ struct grpc_subchannel { int have_alarm; /** our alarm */ grpc_timer alarm; - /** current random value */ - uint32_t random; }; struct grpc_subchannel_call { @@ -297,10 +295,6 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, } } -static uint32_t random_seed() { - return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); -} - grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_subchannel_args *args) { @@ -332,7 +326,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_set_initial_connect_string(&c->addr, &c->addr_len, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); - c->random = random_seed(); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c); diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index 3db462b246..9918fbdcb4 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -252,9 +252,9 @@ char *grpc_subchannel_call_holder_get_peer( grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { grpc_subchannel_call *subchannel_call = GET_CALL(holder); - if (subchannel_call) { - return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); - } else { + if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) { return NULL; + } else { + return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); } } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 2749b0ca01..620ba4e2aa 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -86,7 +86,8 @@ typedef struct { static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_start_resolving_locked(dns_resolver *r); +static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r); static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r); @@ -119,7 +120,7 @@ static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&r->mu); if (!r->resolving) { gpr_backoff_reset(&r->backoff_state); - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } gpr_mu_unlock(&r->mu); } @@ -134,7 +135,7 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, r->target_config = target_config; if (r->resolved_version == 0 && !r->resolving) { gpr_backoff_reset(&r->backoff_state); - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } else { dns_maybe_finish_next_locked(exec_ctx, r); } @@ -149,7 +150,7 @@ static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, r->have_retry_timer = false; if (success) { if (!r->resolving) { - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } } gpr_mu_unlock(&r->mu); @@ -201,11 +202,12 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } -static void dns_start_resolving_locked(dns_resolver *r) { +static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r) { GRPC_RESOLVER_REF(&r->base, "dns-resolving"); GPR_ASSERT(!r->resolving); r->resolving = 1; - grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); + grpc_resolve_address(exec_ctx, r->name, r->default_port, dns_on_resolved, r); } static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c index 898632c3cd..deb4b9b1ef 100644 --- a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c +++ b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c @@ -299,7 +299,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, address = zookeeper_parse_address(value, (size_t)value_len); if (address != NULL) { /** Further resolves address by DNS */ - grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r); gpr_free(address); } else { gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name); @@ -375,8 +375,10 @@ static void zookeeper_get_node_completion(int rc, const char *value, r->resolved_addrs->naddrs = 0; r->resolved_total = 1; /** Further resolves address by DNS */ - grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r); gpr_free(address); + grpc_exec_ctx_finish(&exec_ctx); return; } diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 0ed115793b..c5d3d8d9cc 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -235,5 +235,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_exec_ctx_finish(&exec_ctx); - return channel; /* may be NULL */ + return channel != NULL ? channel : grpc_lame_client_channel_create( + target, GRPC_STATUS_INTERNAL, + "Failed to create client channel"); } diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.c b/src/core/ext/transport/chttp2/transport/bin_encoder.c index db68e750ac..1b43c28be1 100644 --- a/src/core/ext/transport/chttp2/transport/bin_encoder.c +++ b/src/core/ext/transport/chttp2/transport/bin_encoder.c @@ -194,9 +194,13 @@ gpr_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(gpr_slice input) { /* encode full triplets */ for (i = 0; i < input_triplets; i++) { - enc_add2(&out, in[0] >> 2, (uint8_t)((in[0] & 0x3) << 4) | (in[1] >> 4)); - enc_add2(&out, (uint8_t)((in[1] & 0xf) << 2) | (in[2] >> 6), - (uint8_t)(in[2] & 0x3f)); + const uint8_t low_to_high = (uint8_t)((in[0] & 0x3) << 4); + const uint8_t high_to_low = in[1] >> 4; + enc_add2(&out, in[0] >> 2, low_to_high | high_to_low); + + const uint8_t a = (uint8_t)((in[1] & 0xf) << 2); + const uint8_t b = (in[2] >> 6); + enc_add2(&out, a | b, in[2] & 0x3f); in += 3; } @@ -208,12 +212,14 @@ gpr_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(gpr_slice input) { enc_add2(&out, in[0] >> 2, (uint8_t)((in[0] & 0x3) << 4)); in += 1; break; - case 2: - enc_add2(&out, in[0] >> 2, - (uint8_t)((in[0] & 0x3) << 4) | (uint8_t)(in[1] >> 4)); + case 2: { + const uint8_t low_to_high = (uint8_t)((in[0] & 0x3) << 4); + const uint8_t high_to_low = in[1] >> 4; + enc_add2(&out, in[0] >> 2, low_to_high | high_to_low); enc_add1(&out, (uint8_t)((in[1] & 0xf) << 2)); in += 2; break; + } } if (out.temp_length) { diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 93c3e6d8b4..687936bfd3 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -639,7 +639,7 @@ static int on_hdr(grpc_chttp2_hpack_parser *p, grpc_mdelem *md, } } if (p->on_header == NULL) { - grpc_mdelem_unref(md); + GRPC_MDELEM_UNREF(md); return 0; } p->on_header(p->on_header_user_data, md); diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 229fdb5ef6..3d42d0e616 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -268,8 +268,14 @@ static void init_channel_elem(grpc_exec_ctx *exec_ctx, channeld->default_compression_algorithm = grpc_channel_args_get_compression_algorithm(args->channel_args); /* Make sure the default isn't disabled. */ - GPR_ASSERT(grpc_compression_options_is_algorithm_enabled( - &channeld->compression_options, channeld->default_compression_algorithm)); + if (!grpc_compression_options_is_algorithm_enabled( + &channeld->compression_options, + channeld->default_compression_algorithm)) { + gpr_log(GPR_DEBUG, + "compression algorithm %d not enabled: switching to none", + channeld->default_compression_algorithm); + channeld->default_compression_algorithm = GRPC_COMPRESS_NONE; + } channeld->compression_options.default_compression_algorithm = channeld->default_compression_algorithm; diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 76bd1b64dc..f22721ac8f 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -246,7 +246,7 @@ static void internal_request_begin( grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set, req->pollset); - grpc_resolve_address(request->host, req->handshaker->default_port, + grpc_resolve_address(exec_ctx, request->host, req->handshaker->default_port, on_resolved, req); } diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index 921c772453..a7efb5e73e 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -172,8 +172,8 @@ static int add_header(grpc_http_parser *parser) { while (cur != end && (*cur == ' ' || *cur == '\t')) { cur++; } - GPR_ASSERT(end - cur >= 2); - hdr.value = buf2str(cur, (size_t)(end - cur) - 2); + GPR_ASSERT((size_t)(end - cur) >= parser->cur_line_end_length); + hdr.value = buf2str(cur, (size_t)(end - cur) - parser->cur_line_end_length); if (parser->type == GRPC_HTTP_RESPONSE) { hdr_count = &parser->http.response.hdr_count; @@ -208,7 +208,7 @@ static int finish_line(grpc_http_parser *parser) { parser->state = GRPC_HTTP_HEADERS; break; case GRPC_HTTP_HEADERS: - if (parser->cur_line_length == 2) { + if (parser->cur_line_length == parser->cur_line_end_length) { parser->state = GRPC_HTTP_BODY; break; } @@ -248,6 +248,30 @@ static int addbyte_body(grpc_http_parser *parser, uint8_t byte) { return 1; } +static int check_line(grpc_http_parser *parser) { + if (parser->cur_line_length >= 2 && + parser->cur_line[parser->cur_line_length - 2] == '\r' && + parser->cur_line[parser->cur_line_length - 1] == '\n') { + return 1; + } + + // HTTP request with \n\r line termiantors. + else if (parser->cur_line_length >= 2 && + parser->cur_line[parser->cur_line_length - 2] == '\n' && + parser->cur_line[parser->cur_line_length - 1] == '\r') { + return 1; + } + + // HTTP request with only \n line terminators. + else if (parser->cur_line_length >= 1 && + parser->cur_line[parser->cur_line_length - 1] == '\n') { + parser->cur_line_end_length = 1; + return 1; + } + + return 0; +} + static int addbyte(grpc_http_parser *parser, uint8_t byte) { switch (parser->state) { case GRPC_HTTP_FIRST_LINE: @@ -260,9 +284,7 @@ static int addbyte(grpc_http_parser *parser, uint8_t byte) { } parser->cur_line[parser->cur_line_length] = byte; parser->cur_line_length++; - if (parser->cur_line_length >= 2 && - parser->cur_line[parser->cur_line_length - 2] == '\r' && - parser->cur_line[parser->cur_line_length - 1] == '\n') { + if (check_line(parser)) { return finish_line(parser); } else { return 1; @@ -278,6 +300,7 @@ void grpc_http_parser_init(grpc_http_parser *parser) { memset(parser, 0, sizeof(*parser)); parser->state = GRPC_HTTP_FIRST_LINE; parser->type = GRPC_HTTP_UNKNOWN; + parser->cur_line_end_length = 2; } void grpc_http_parser_destroy(grpc_http_parser *parser) { diff --git a/src/core/lib/http/parser.h b/src/core/lib/http/parser.h index 42fa5181b8..536637e9a2 100644 --- a/src/core/lib/http/parser.h +++ b/src/core/lib/http/parser.h @@ -105,6 +105,7 @@ typedef struct { uint8_t cur_line[GRPC_HTTP_PARSER_MAX_HEADER_LENGTH]; size_t cur_line_length; + size_t cur_line_end_length; } grpc_http_parser; void grpc_http_parser_init(grpc_http_parser *parser); diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index d6f073fc9d..27793c32e4 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -54,6 +54,12 @@ void grpc_closure_list_add(grpc_closure_list *closure_list, closure_list->tail = closure; } +void grpc_closure_list_fail_all(grpc_closure_list *list) { + for (grpc_closure *c = list->head; c != NULL; c = grpc_closure_next(c)) { + c->final_data &= ~(uintptr_t)1; + } +} + bool grpc_closure_list_empty(grpc_closure_list closure_list) { return closure_list.head == NULL; } diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 8652b53a8b..fdc2daed9d 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -86,6 +86,9 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, bool success); +/** force all success bits in \a list to false */ +void grpc_closure_list_fail_all(grpc_closure_list *list); + /** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index e09ef02400..976cc40347 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -93,6 +93,8 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_workqueue *offload_target_or_null); void grpc_exec_ctx_global_init(void); + +void grpc_exec_ctx_global_init(void); void grpc_exec_ctx_global_shutdown(void); #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */ diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index ecc06340a3..ef198fe0f6 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -59,8 +59,9 @@ typedef void (*grpc_resolve_cb)(grpc_exec_ctx *exec_ctx, void *arg, /* Asynchronously resolve addr. Use default_port if a port isn't designated in addr, otherwise use the port in addr. */ /* TODO(ctiller): add a timeout here */ -void grpc_resolve_address(const char *addr, const char *default_port, - grpc_resolve_cb cb, void *arg); +extern void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *addr, + const char *default_port, + grpc_resolve_cb cb, void *arg); /* Destroy resolved addresses */ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index b9d3bbdb89..cae91eec20 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -164,8 +164,9 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { gpr_free(addrs); } -void grpc_resolve_address(const char *name, const char *default_port, - grpc_resolve_cb cb, void *arg) { +static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, grpc_resolve_cb cb, + void *arg) { request *r = gpr_malloc(sizeof(request)); grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); @@ -175,4 +176,8 @@ void grpc_resolve_address(const char *name, const char *default_port, grpc_executor_enqueue(&r->request_closure, 1); } +void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, grpc_resolve_cb cb, + void *arg) = resolve_address_impl; + #endif diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 82763d11f4..914736234d 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -155,8 +155,9 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { gpr_free(addrs); } -void grpc_resolve_address(const char *name, const char *default_port, - grpc_resolve_cb cb, void *arg) { +static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, grpc_resolve_cb cb, + void *arg) { request *r = gpr_malloc(sizeof(request)); grpc_closure_init(&r->request_closure, do_request_thread, r); r->name = gpr_strdup(name); @@ -166,4 +167,8 @@ void grpc_resolve_address(const char *name, const char *default_port, grpc_executor_enqueue(&r->request_closure, 1); } +void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, grpc_resolve_cb cb, + void *arg) = resolve_address_impl; + #endif diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 6430cb629f..e93d5734a0 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -211,11 +211,11 @@ finish: grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL); } -void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_endpoint **ep, - grpc_pollset_set *interested_parties, - const struct sockaddr *addr, size_t addr_len, - gpr_timespec deadline) { +static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_endpoint **ep, + grpc_pollset_set *interested_parties, + const struct sockaddr *addr, + size_t addr_len, gpr_timespec deadline) { int fd; grpc_dualstack_mode dsmode; int err; @@ -303,4 +303,19 @@ done: gpr_free(addr_str); } +// overridden by api_fuzzer.c +void (*grpc_tcp_client_connect_impl)( + grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, + grpc_pollset_set *interested_parties, const struct sockaddr *addr, + size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl; + +void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_endpoint **ep, + grpc_pollset_set *interested_parties, + const struct sockaddr *addr, size_t addr_len, + gpr_timespec deadline) { + grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, + addr_len, deadline); +} + #endif diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c index 713f15b69e..acb5b26c87 100644 --- a/src/core/lib/iomgr/timer.c +++ b/src/core/lib/iomgr/timer.c @@ -70,6 +70,7 @@ static gpr_clock_type g_clock_type; static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; +static bool g_initialized = false; static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next, int success); @@ -83,6 +84,7 @@ static gpr_timespec compute_min_deadline(shard_type *shard) { void grpc_timer_list_init(gpr_timespec now) { uint32_t i; + g_initialized = true; gpr_mu_init(&g_mu); gpr_mu_init(&g_checker_mu); g_clock_type = now.clock_type; @@ -111,6 +113,7 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { } gpr_mu_destroy(&g_mu); gpr_mu_destroy(&g_checker_mu); + g_initialized = false; } /* This is a cheap, but good enough, pointer hash for sharding the tasks: */ @@ -180,6 +183,18 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, timer->deadline = deadline; timer->triggered = 0; + if (!g_initialized) { + timer->triggered = 1; + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL); + return; + } + + if (gpr_time_cmp(deadline, now) <= 0) { + timer->triggered = 1; + grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, true, NULL); + return; + } + /* TODO(ctiller): check deadline expired */ gpr_mu_lock(&shard->mu); diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c index f5f62dadc6..11542072fe 100644 --- a/src/core/lib/support/time_posix.c +++ b/src/core/lib/support/time_posix.c @@ -78,7 +78,7 @@ static const clockid_t clockid_for_gpr_clock[] = {CLOCK_MONOTONIC, void gpr_time_init(void) { gpr_precise_clock_init(); } -gpr_timespec gpr_now(gpr_clock_type clock_type) { +static gpr_timespec now_impl(gpr_clock_type clock_type) { struct timespec now; GPR_ASSERT(clock_type != GPR_TIMESPAN); if (clock_type == GPR_CLOCK_PRECISE) { @@ -114,7 +114,7 @@ void gpr_time_init(void) { g_time_start = mach_absolute_time(); } -gpr_timespec gpr_now(gpr_clock_type clock) { +static gpr_timespec now_impl(gpr_clock_type clock) { gpr_timespec now; struct timeval now_tv; double now_dbl; @@ -142,6 +142,12 @@ gpr_timespec gpr_now(gpr_clock_type clock) { } #endif +gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type) = now_impl; + +gpr_timespec gpr_now(gpr_clock_type clock_type) { + return gpr_now_impl(clock_type); +} + void gpr_sleep_until(gpr_timespec until) { gpr_timespec now; gpr_timespec delta; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 6581bbd3d1..fa12b6ea61 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -142,22 +142,23 @@ struct grpc_call { gpr_mu mu; /* client or server call */ - uint8_t is_client; + bool is_client; /* is the alarm set */ - uint8_t have_alarm; + bool have_alarm; /** has grpc_call_destroy been called */ - uint8_t destroy_called; + bool destroy_called; /** flag indicating that cancellation is inherited */ - uint8_t cancellation_is_inherited; + bool cancellation_is_inherited; /** bitmask of live batches */ uint8_t used_batches; /** which ops are in-flight */ - uint8_t sent_initial_metadata; - uint8_t sending_message; - uint8_t sent_final_op; - uint8_t received_initial_metadata; - uint8_t receiving_message; - uint8_t received_final_op; + bool sent_initial_metadata; + bool sending_message; + bool sent_final_op; + bool received_initial_metadata; + bool receiving_message; + bool requested_final_op; + bool received_final_op; /* have we received initial metadata */ bool has_initial_md_been_received; @@ -220,10 +221,7 @@ struct grpc_call { } server; } final_op; - struct { - void *bctlp; - bool success; - } saved_receiving_stream_ready_ctx; + void *saved_receiving_stream_ready_bctlp; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -554,21 +552,6 @@ static int prepare_application_metadata(grpc_call *call, int count, int i; grpc_metadata_batch *batch = &call->metadata_batch[0 /* is_receiving */][is_trailing]; - if (prepend_extra_metadata) { - if (call->send_extra_metadata_count == 0) { - prepend_extra_metadata = 0; - } else { - for (i = 0; i < call->send_extra_metadata_count; i++) { - GRPC_MDELEM_REF(call->send_extra_metadata[i].md); - } - for (i = 1; i < call->send_extra_metadata_count; i++) { - call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; - } - for (i = 0; i < call->send_extra_metadata_count - 1; i++) { - call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; - } - } - } for (i = 0; i < count; i++) { grpc_metadata *md = &metadata[i]; grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; @@ -579,14 +562,37 @@ static int prepare_application_metadata(grpc_call *call, int count, GRPC_MDSTR_LENGTH(l->md->key))) { gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s", grpc_mdstr_as_c_string(l->md->key)); - return 0; + break; } else if (!grpc_is_binary_header(grpc_mdstr_as_c_string(l->md->key), GRPC_MDSTR_LENGTH(l->md->key)) && !grpc_header_nonbin_value_is_legal( grpc_mdstr_as_c_string(l->md->value), GRPC_MDSTR_LENGTH(l->md->value))) { gpr_log(GPR_ERROR, "attempt to send invalid metadata value"); - return 0; + break; + } + } + if (i != count) { + for (int j = 0; j <= i; j++) { + grpc_metadata *md = &metadata[j]; + grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data; + GRPC_MDELEM_UNREF(l->md); + } + return 0; + } + if (prepend_extra_metadata) { + if (call->send_extra_metadata_count == 0) { + prepend_extra_metadata = 0; + } else { + for (i = 0; i < call->send_extra_metadata_count; i++) { + GRPC_MDELEM_REF(call->send_extra_metadata[i].md); + } + for (i = 1; i < call->send_extra_metadata_count; i++) { + call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1]; + } + for (i = 0; i < call->send_extra_metadata_count - 1; i++) { + call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1]; + } } } for (i = 1; i < count; i++) { @@ -1057,12 +1063,12 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_call *call = bctl->call; gpr_mu_lock(&bctl->call->mu); - if (bctl->call->has_initial_md_been_received) { + if (bctl->call->has_initial_md_been_received || !success || + call->receiving_stream == NULL) { gpr_mu_unlock(&bctl->call->mu); process_data_after_md(exec_ctx, bctlp, success); } else { - call->saved_receiving_stream_ready_ctx.bctlp = bctlp; - call->saved_receiving_stream_ready_ctx.success = success; + call->saved_receiving_stream_ready_bctlp = bctlp; gpr_mu_unlock(&bctl->call->mu); } } @@ -1091,13 +1097,11 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } call->has_initial_md_been_received = true; - if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) { + if (call->saved_receiving_stream_ready_bctlp != NULL) { grpc_closure *saved_rsr_closure = grpc_closure_create( - receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp); - grpc_exec_ctx_enqueue( - exec_ctx, saved_rsr_closure, - call->saved_receiving_stream_ready_ctx.success && success, NULL); - call->saved_receiving_stream_ready_ctx.bctlp = NULL; + receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); + call->saved_receiving_stream_ready_bctlp = NULL; + grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure, success, NULL); } gpr_mu_unlock(&call->mu); @@ -1133,6 +1137,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) { &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; grpc_metadata_batch_filter(md, recv_trailing_filter, call); + call->received_final_op = true; if (call->have_alarm) { grpc_timer_cancel(exec_ctx, &call->alarm); } @@ -1377,11 +1382,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_NOT_ON_SERVER; goto done_with_error; } - if (call->received_final_op) { + if (call->requested_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->received_final_op = 1; + call->requested_final_op = 1; call->buffered_metadata[1] = op->data.recv_status_on_client.trailing_metadata; call->final_op.client.status = op->data.recv_status_on_client.status; @@ -1404,11 +1409,11 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, error = GRPC_CALL_ERROR_NOT_ON_CLIENT; goto done_with_error; } - if (call->received_final_op) { + if (call->requested_final_op) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } - call->received_final_op = 1; + call->requested_final_op = 1; call->final_op.server.cancelled = op->data.recv_close_on_server.cancelled; bctl->recv_final_op = 1; @@ -1457,7 +1462,7 @@ done_with_error: call->receiving_message = 0; } if (bctl->recv_final_op) { - call->received_final_op = 0; + call->requested_final_op = 0; } gpr_mu_unlock(&call->mu); goto done; diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index c1f6812c4e..80bd95df68 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -99,6 +99,9 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->on_consumed != NULL) { op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 1); } + if (op->send_ping != NULL) { + op->send_ping->cb(exec_ctx, op->send_ping->cb_arg, 0); + } } static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, diff --git a/src/core/lib/surface/validate_metadata.c b/src/core/lib/surface/validate_metadata.c index bf4126867f..84f0a083bc 100644 --- a/src/core/lib/surface/validate_metadata.c +++ b/src/core/lib/surface/validate_metadata.c @@ -40,7 +40,7 @@ static int conforms_to(const char *s, size_t len, const uint8_t *legal_bits) { const char *p = s; const char *e = s + len; for (; p != e; p++) { - int idx = *p; + int idx = (uint8_t)*p; int byte = idx / 8; int bit = idx % 8; if ((legal_bits[byte] & (1 << bit)) == 0) return 0; diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index e29e8df2c9..713d9e6782 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -120,6 +120,7 @@ void grpc_mdelem_set_user_data(grpc_mdelem *md, void (*destroy_func)(void *), void *user_data); /* Reference counting */ +//#define GRPC_METADATA_REFCOUNT_DEBUG #ifdef GRPC_METADATA_REFCOUNT_DEBUG #define GRPC_MDSTR_REF(s) grpc_mdstr_ref((s), __FILE__, __LINE__) #define GRPC_MDSTR_UNREF(s) grpc_mdstr_unref((s), __FILE__, __LINE__) diff --git a/src/csharp/Grpc.Core.Tests/CallOptionsTest.cs b/src/csharp/Grpc.Core.Tests/CallOptionsTest.cs index a3a613be74..99a2d47e6e 100644 --- a/src/csharp/Grpc.Core.Tests/CallOptionsTest.cs +++ b/src/csharp/Grpc.Core.Tests/CallOptionsTest.cs @@ -54,10 +54,20 @@ namespace Grpc.Core.Tests var deadline = DateTime.UtcNow; Assert.AreEqual(deadline, options.WithDeadline(deadline).Deadline.Value); - var token = new CancellationTokenSource().Token; - Assert.AreEqual(token, options.WithCancellationToken(token).CancellationToken); + var cancellationToken = new CancellationTokenSource().Token; + Assert.AreEqual(cancellationToken, options.WithCancellationToken(cancellationToken).CancellationToken); + + var writeOptions = new WriteOptions(); + Assert.AreSame(writeOptions, options.WithWriteOptions(writeOptions).WriteOptions); + + var propagationToken = new ContextPropagationToken(CallSafeHandle.NullInstance, DateTime.UtcNow, + CancellationToken.None, ContextPropagationOptions.Default); + Assert.AreSame(propagationToken, options.WithPropagationToken(propagationToken).PropagationToken); + + var credentials = new FakeCallCredentials(); + Assert.AreSame(credentials, options.WithCredentials(credentials).Credentials); - // Change original instance is unchanged. + // Check that the original instance is unchanged. Assert.IsNull(options.Headers); Assert.IsNull(options.Deadline); Assert.AreEqual(CancellationToken.None, options.CancellationToken); diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs index caf8210d91..9ca88849ee 100644 --- a/src/csharp/Grpc.Core/CallOptions.cs +++ b/src/csharp/Grpc.Core/CallOptions.cs @@ -100,10 +100,7 @@ namespace Grpc.Core /// </summary> public WriteOptions WriteOptions { - get - { - return this.writeOptions; - } + get { return this.writeOptions; } } /// <summary> @@ -111,10 +108,7 @@ namespace Grpc.Core /// </summary> public ContextPropagationToken PropagationToken { - get - { - return this.propagationToken; - } + get { return this.propagationToken; } } /// <summary> @@ -122,10 +116,7 @@ namespace Grpc.Core /// </summary> public CallCredentials Credentials { - get - { - return this.credentials; - } + get { return this.credentials; } } /// <summary> @@ -165,6 +156,42 @@ namespace Grpc.Core } /// <summary> + /// Returns new instance of <see cref="CallOptions"/> with + /// <c>WriteOptions</c> set to the value provided. Values of all other fields are preserved. + /// </summary> + /// <param name="writeOptions">The write options.</param> + public CallOptions WithWriteOptions(WriteOptions writeOptions) + { + var newOptions = this; + newOptions.writeOptions = writeOptions; + return newOptions; + } + + /// <summary> + /// Returns new instance of <see cref="CallOptions"/> with + /// <c>PropagationToken</c> set to the value provided. Values of all other fields are preserved. + /// </summary> + /// <param name="propagationToken">The context propagation token.</param> + public CallOptions WithPropagationToken(ContextPropagationToken propagationToken) + { + var newOptions = this; + newOptions.propagationToken = propagationToken; + return newOptions; + } + + /// <summary> + /// Returns new instance of <see cref="CallOptions"/> with + /// <c>Credentials</c> set to the value provided. Values of all other fields are preserved. + /// </summary> + /// <param name="credentials">The call credentials.</param> + public CallOptions WithCredentials(CallCredentials credentials) + { + var newOptions = this; + newOptions.credentials = credentials; + return newOptions; + } + + /// <summary> /// Returns a new instance of <see cref="CallOptions"/> with /// all previously unset values set to their defaults and deadline and cancellation /// token propagated when appropriate. diff --git a/src/csharp/Grpc.IntegrationTesting.StressClient/.gitignore b/src/csharp/Grpc.IntegrationTesting.StressClient/.gitignore new file mode 100644 index 0000000000..a382af2294 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting.StressClient/.gitignore @@ -0,0 +1,3 @@ +bin +obj + diff --git a/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj b/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj new file mode 100644 index 0000000000..d6eba74289 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj @@ -0,0 +1,60 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{ADEBA147-80AE-4710-82E9-5B7F93690266}</ProjectGuid> + <OutputType>Exe</OutputType> + <RootNamespace>Grpc.IntegrationTesting.StressClient</RootNamespace> + <AssemblyName>Grpc.IntegrationTesting.StressClient</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>bin\Debug</OutputPath> + <DefineConstants>DEBUG;</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + <PlatformTarget>AnyCPU</PlatformTarget> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>bin\Release</OutputPath> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + <PlatformTarget>AnyCPU</PlatformTarget> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'ReleaseSigned|AnyCPU' "> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>bin\ReleaseSigned</OutputPath> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + <SignAssembly>True</SignAssembly> + <AssemblyOriginatorKeyFile>..\keys\Grpc.snk</AssemblyOriginatorKeyFile> + </PropertyGroup> + <ItemGroup> + <Reference Include="System" /> + </ItemGroup> + <ItemGroup> + <Compile Include="..\Grpc.Core\Version.cs"> + <Link>Version.cs</Link> + </Compile> + <Compile Include="Program.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + </ItemGroup> + <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> + <ItemGroup> + <ProjectReference Include="..\Grpc.Core\Grpc.Core.csproj"> + <Project>{CCC4440E-49F7-4790-B0AF-FEABB0837AE7}</Project> + <Name>Grpc.Core</Name> + </ProjectReference> + <ProjectReference Include="..\Grpc.IntegrationTesting\Grpc.IntegrationTesting.csproj"> + <Project>{C61154BA-DD4A-4838-8420-0162A28925E0}</Project> + <Name>Grpc.IntegrationTesting</Name> + </ProjectReference> + </ItemGroup> +</Project>
\ No newline at end of file diff --git a/src/csharp/Grpc.IntegrationTesting.StressClient/Program.cs b/src/csharp/Grpc.IntegrationTesting.StressClient/Program.cs new file mode 100644 index 0000000000..dffdf22fa5 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting.StressClient/Program.cs @@ -0,0 +1,45 @@ +#region Copyright notice and license + +// Copyright 2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; + +namespace Grpc.IntegrationTesting.StressClient +{ + class MainClass + { + public static void Main(string[] args) + { + StressTestClient.Run(args); + } + } +} diff --git a/src/csharp/Grpc.IntegrationTesting.StressClient/Properties/AssemblyInfo.cs b/src/csharp/Grpc.IntegrationTesting.StressClient/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..e845bbfb9e --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting.StressClient/Properties/AssemblyInfo.cs @@ -0,0 +1,11 @@ +using System.Reflection; +using System.Runtime.CompilerServices; + +[assembly: AssemblyTitle("Grpc.IntegrationTesting.StressClient")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("")] +[assembly: AssemblyCopyright("Google Inc. All rights reserved.")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] diff --git a/src/csharp/Grpc.IntegrationTesting/Control.cs b/src/csharp/Grpc.IntegrationTesting/Control.cs index 003d2428fa..3fa8d43f38 100644 --- a/src/csharp/Grpc.IntegrationTesting/Control.cs +++ b/src/csharp/Grpc.IntegrationTesting/Control.cs @@ -31,7 +31,7 @@ namespace Grpc.Testing { "cnBjLnRlc3RpbmcuQ2xvc2VkTG9vcFBhcmFtc0gAEi4KB3BvaXNzb24YAiAB", "KAsyGy5ncnBjLnRlc3RpbmcuUG9pc3NvblBhcmFtc0gAQgYKBGxvYWQiQwoO", "U2VjdXJpdHlQYXJhbXMSEwoLdXNlX3Rlc3RfY2EYASABKAgSHAoUc2VydmVy", - "X2hvc3Rfb3ZlcnJpZGUYAiABKAki1gMKDENsaWVudENvbmZpZxIWCg5zZXJ2", + "X2hvc3Rfb3ZlcnJpZGUYAiABKAki8AMKDENsaWVudENvbmZpZxIWCg5zZXJ2", "ZXJfdGFyZ2V0cxgBIAMoCRItCgtjbGllbnRfdHlwZRgCIAEoDjIYLmdycGMu", "dGVzdGluZy5DbGllbnRUeXBlEjUKD3NlY3VyaXR5X3BhcmFtcxgDIAEoCzIc", "LmdycGMudGVzdGluZy5TZWN1cml0eVBhcmFtcxIkChxvdXRzdGFuZGluZ19y", @@ -41,46 +41,48 @@ namespace Grpc.Testing { "ASgLMhguZ3JwYy50ZXN0aW5nLkxvYWRQYXJhbXMSMwoOcGF5bG9hZF9jb25m", "aWcYCyABKAsyGy5ncnBjLnRlc3RpbmcuUGF5bG9hZENvbmZpZxI3ChBoaXN0", "b2dyYW1fcGFyYW1zGAwgASgLMh0uZ3JwYy50ZXN0aW5nLkhpc3RvZ3JhbVBh", - "cmFtcxIRCgljb3JlX2xpc3QYDSADKAUSEgoKY29yZV9saW1pdBgOIAEoBSI4", - "CgxDbGllbnRTdGF0dXMSKAoFc3RhdHMYASABKAsyGS5ncnBjLnRlc3Rpbmcu", - "Q2xpZW50U3RhdHMiFQoETWFyaxINCgVyZXNldBgBIAEoCCJoCgpDbGllbnRB", - "cmdzEisKBXNldHVwGAEgASgLMhouZ3JwYy50ZXN0aW5nLkNsaWVudENvbmZp", - "Z0gAEiIKBG1hcmsYAiABKAsyEi5ncnBjLnRlc3RpbmcuTWFya0gAQgkKB2Fy", - "Z3R5cGUi/AEKDFNlcnZlckNvbmZpZxItCgtzZXJ2ZXJfdHlwZRgBIAEoDjIY", - "LmdycGMudGVzdGluZy5TZXJ2ZXJUeXBlEjUKD3NlY3VyaXR5X3BhcmFtcxgC", - "IAEoCzIcLmdycGMudGVzdGluZy5TZWN1cml0eVBhcmFtcxIMCgRwb3J0GAQg", - "ASgFEhwKFGFzeW5jX3NlcnZlcl90aHJlYWRzGAcgASgFEhIKCmNvcmVfbGlt", - "aXQYCCABKAUSMwoOcGF5bG9hZF9jb25maWcYCSABKAsyGy5ncnBjLnRlc3Rp", - "bmcuUGF5bG9hZENvbmZpZxIRCgljb3JlX2xpc3QYCiADKAUiaAoKU2VydmVy", - "QXJncxIrCgVzZXR1cBgBIAEoCzIaLmdycGMudGVzdGluZy5TZXJ2ZXJDb25m", - "aWdIABIiCgRtYXJrGAIgASgLMhIuZ3JwYy50ZXN0aW5nLk1hcmtIAEIJCgdh", - "cmd0eXBlIlUKDFNlcnZlclN0YXR1cxIoCgVzdGF0cxgBIAEoCzIZLmdycGMu", - "dGVzdGluZy5TZXJ2ZXJTdGF0cxIMCgRwb3J0GAIgASgFEg0KBWNvcmVzGAMg", - "ASgFIg0KC0NvcmVSZXF1ZXN0Ih0KDENvcmVSZXNwb25zZRINCgVjb3JlcxgB", - "IAEoBSIGCgRWb2lkIv0BCghTY2VuYXJpbxIMCgRuYW1lGAEgASgJEjEKDWNs", - "aWVudF9jb25maWcYAiABKAsyGi5ncnBjLnRlc3RpbmcuQ2xpZW50Q29uZmln", - "EhMKC251bV9jbGllbnRzGAMgASgFEjEKDXNlcnZlcl9jb25maWcYBCABKAsy", - "Gi5ncnBjLnRlc3RpbmcuU2VydmVyQ29uZmlnEhMKC251bV9zZXJ2ZXJzGAUg", - "ASgFEhYKDndhcm11cF9zZWNvbmRzGAYgASgFEhkKEWJlbmNobWFya19zZWNv", - "bmRzGAcgASgFEiAKGHNwYXduX2xvY2FsX3dvcmtlcl9jb3VudBgIIAEoBSI2", - "CglTY2VuYXJpb3MSKQoJc2NlbmFyaW9zGAEgAygLMhYuZ3JwYy50ZXN0aW5n", - "LlNjZW5hcmlvIpICChVTY2VuYXJpb1Jlc3VsdFN1bW1hcnkSCwoDcXBzGAEg", - "ASgBEhsKE3Fwc19wZXJfc2VydmVyX2NvcmUYAiABKAESGgoSc2VydmVyX3N5", - "c3RlbV90aW1lGAMgASgBEhgKEHNlcnZlcl91c2VyX3RpbWUYBCABKAESGgoS", - "Y2xpZW50X3N5c3RlbV90aW1lGAUgASgBEhgKEGNsaWVudF91c2VyX3RpbWUY", - "BiABKAESEgoKbGF0ZW5jeV81MBgHIAEoARISCgpsYXRlbmN5XzkwGAggASgB", - "EhIKCmxhdGVuY3lfOTUYCSABKAESEgoKbGF0ZW5jeV85ORgKIAEoARITCgts", - "YXRlbmN5Xzk5ORgLIAEoASKYAgoOU2NlbmFyaW9SZXN1bHQSKAoIc2NlbmFy", - "aW8YASABKAsyFi5ncnBjLnRlc3RpbmcuU2NlbmFyaW8SLgoJbGF0ZW5jaWVz", - "GAIgASgLMhsuZ3JwYy50ZXN0aW5nLkhpc3RvZ3JhbURhdGESLwoMY2xpZW50", - "X3N0YXRzGAMgAygLMhkuZ3JwYy50ZXN0aW5nLkNsaWVudFN0YXRzEi8KDHNl", - "cnZlcl9zdGF0cxgEIAMoCzIZLmdycGMudGVzdGluZy5TZXJ2ZXJTdGF0cxIU", - "CgxzZXJ2ZXJfY29yZXMYBSADKAUSNAoHc3VtbWFyeRgGIAEoCzIjLmdycGMu", - "dGVzdGluZy5TY2VuYXJpb1Jlc3VsdFN1bW1hcnkqLwoKQ2xpZW50VHlwZRIP", - "CgtTWU5DX0NMSUVOVBAAEhAKDEFTWU5DX0NMSUVOVBABKkkKClNlcnZlclR5", - "cGUSDwoLU1lOQ19TRVJWRVIQABIQCgxBU1lOQ19TRVJWRVIQARIYChRBU1lO", - "Q19HRU5FUklDX1NFUlZFUhACKiMKB1JwY1R5cGUSCQoFVU5BUlkQABINCglT", - "VFJFQU1JTkcQAWIGcHJvdG8z")); + "cmFtcxIRCgljb3JlX2xpc3QYDSADKAUSEgoKY29yZV9saW1pdBgOIAEoBRIY", + "ChBvdGhlcl9jbGllbnRfYXBpGA8gASgJIjgKDENsaWVudFN0YXR1cxIoCgVz", + "dGF0cxgBIAEoCzIZLmdycGMudGVzdGluZy5DbGllbnRTdGF0cyIVCgRNYXJr", + "Eg0KBXJlc2V0GAEgASgIImgKCkNsaWVudEFyZ3MSKwoFc2V0dXAYASABKAsy", + "Gi5ncnBjLnRlc3RpbmcuQ2xpZW50Q29uZmlnSAASIgoEbWFyaxgCIAEoCzIS", + "LmdycGMudGVzdGluZy5NYXJrSABCCQoHYXJndHlwZSKWAgoMU2VydmVyQ29u", + "ZmlnEi0KC3NlcnZlcl90eXBlGAEgASgOMhguZ3JwYy50ZXN0aW5nLlNlcnZl", + "clR5cGUSNQoPc2VjdXJpdHlfcGFyYW1zGAIgASgLMhwuZ3JwYy50ZXN0aW5n", + "LlNlY3VyaXR5UGFyYW1zEgwKBHBvcnQYBCABKAUSHAoUYXN5bmNfc2VydmVy", + "X3RocmVhZHMYByABKAUSEgoKY29yZV9saW1pdBgIIAEoBRIzCg5wYXlsb2Fk", + "X2NvbmZpZxgJIAEoCzIbLmdycGMudGVzdGluZy5QYXlsb2FkQ29uZmlnEhEK", + "CWNvcmVfbGlzdBgKIAMoBRIYChBvdGhlcl9zZXJ2ZXJfYXBpGAsgASgJImgK", + "ClNlcnZlckFyZ3MSKwoFc2V0dXAYASABKAsyGi5ncnBjLnRlc3RpbmcuU2Vy", + "dmVyQ29uZmlnSAASIgoEbWFyaxgCIAEoCzISLmdycGMudGVzdGluZy5NYXJr", + "SABCCQoHYXJndHlwZSJVCgxTZXJ2ZXJTdGF0dXMSKAoFc3RhdHMYASABKAsy", + "GS5ncnBjLnRlc3RpbmcuU2VydmVyU3RhdHMSDAoEcG9ydBgCIAEoBRINCgVj", + "b3JlcxgDIAEoBSINCgtDb3JlUmVxdWVzdCIdCgxDb3JlUmVzcG9uc2USDQoF", + "Y29yZXMYASABKAUiBgoEVm9pZCL9AQoIU2NlbmFyaW8SDAoEbmFtZRgBIAEo", + "CRIxCg1jbGllbnRfY29uZmlnGAIgASgLMhouZ3JwYy50ZXN0aW5nLkNsaWVu", + "dENvbmZpZxITCgtudW1fY2xpZW50cxgDIAEoBRIxCg1zZXJ2ZXJfY29uZmln", + "GAQgASgLMhouZ3JwYy50ZXN0aW5nLlNlcnZlckNvbmZpZxITCgtudW1fc2Vy", + "dmVycxgFIAEoBRIWCg53YXJtdXBfc2Vjb25kcxgGIAEoBRIZChFiZW5jaG1h", + "cmtfc2Vjb25kcxgHIAEoBRIgChhzcGF3bl9sb2NhbF93b3JrZXJfY291bnQY", + "CCABKAUiNgoJU2NlbmFyaW9zEikKCXNjZW5hcmlvcxgBIAMoCzIWLmdycGMu", + "dGVzdGluZy5TY2VuYXJpbyKSAgoVU2NlbmFyaW9SZXN1bHRTdW1tYXJ5EgsK", + "A3FwcxgBIAEoARIbChNxcHNfcGVyX3NlcnZlcl9jb3JlGAIgASgBEhoKEnNl", + "cnZlcl9zeXN0ZW1fdGltZRgDIAEoARIYChBzZXJ2ZXJfdXNlcl90aW1lGAQg", + "ASgBEhoKEmNsaWVudF9zeXN0ZW1fdGltZRgFIAEoARIYChBjbGllbnRfdXNl", + "cl90aW1lGAYgASgBEhIKCmxhdGVuY3lfNTAYByABKAESEgoKbGF0ZW5jeV85", + "MBgIIAEoARISCgpsYXRlbmN5Xzk1GAkgASgBEhIKCmxhdGVuY3lfOTkYCiAB", + "KAESEwoLbGF0ZW5jeV85OTkYCyABKAEimAIKDlNjZW5hcmlvUmVzdWx0EigK", + "CHNjZW5hcmlvGAEgASgLMhYuZ3JwYy50ZXN0aW5nLlNjZW5hcmlvEi4KCWxh", + "dGVuY2llcxgCIAEoCzIbLmdycGMudGVzdGluZy5IaXN0b2dyYW1EYXRhEi8K", + "DGNsaWVudF9zdGF0cxgDIAMoCzIZLmdycGMudGVzdGluZy5DbGllbnRTdGF0", + "cxIvCgxzZXJ2ZXJfc3RhdHMYBCADKAsyGS5ncnBjLnRlc3RpbmcuU2VydmVy", + "U3RhdHMSFAoMc2VydmVyX2NvcmVzGAUgAygFEjQKB3N1bW1hcnkYBiABKAsy", + "Iy5ncnBjLnRlc3RpbmcuU2NlbmFyaW9SZXN1bHRTdW1tYXJ5KkEKCkNsaWVu", + "dFR5cGUSDwoLU1lOQ19DTElFTlQQABIQCgxBU1lOQ19DTElFTlQQARIQCgxP", + "VEhFUl9DTElFTlQQAipbCgpTZXJ2ZXJUeXBlEg8KC1NZTkNfU0VSVkVSEAAS", + "EAoMQVNZTkNfU0VSVkVSEAESGAoUQVNZTkNfR0VORVJJQ19TRVJWRVIQAhIQ", + "CgxPVEhFUl9TRVJWRVIQAyojCgdScGNUeXBlEgkKBVVOQVJZEAASDQoJU1RS", + "RUFNSU5HEAFiBnByb3RvMw==")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Grpc.Testing.PayloadsReflection.Descriptor, global::Grpc.Testing.StatsReflection.Descriptor, }, new pbr::GeneratedCodeInfo(new[] {typeof(global::Grpc.Testing.ClientType), typeof(global::Grpc.Testing.ServerType), typeof(global::Grpc.Testing.RpcType), }, new pbr::GeneratedCodeInfo[] { @@ -88,11 +90,11 @@ namespace Grpc.Testing { new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClosedLoopParams), global::Grpc.Testing.ClosedLoopParams.Parser, null, null, null, null), new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.LoadParams), global::Grpc.Testing.LoadParams.Parser, new[]{ "ClosedLoop", "Poisson" }, new[]{ "Load" }, null, null), new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.SecurityParams), global::Grpc.Testing.SecurityParams.Parser, new[]{ "UseTestCa", "ServerHostOverride" }, null, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClientConfig), global::Grpc.Testing.ClientConfig.Parser, new[]{ "ServerTargets", "ClientType", "SecurityParams", "OutstandingRpcsPerChannel", "ClientChannels", "AsyncClientThreads", "RpcType", "LoadParams", "PayloadConfig", "HistogramParams", "CoreList", "CoreLimit" }, null, null, null), + new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClientConfig), global::Grpc.Testing.ClientConfig.Parser, new[]{ "ServerTargets", "ClientType", "SecurityParams", "OutstandingRpcsPerChannel", "ClientChannels", "AsyncClientThreads", "RpcType", "LoadParams", "PayloadConfig", "HistogramParams", "CoreList", "CoreLimit", "OtherClientApi" }, null, null, null), new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClientStatus), global::Grpc.Testing.ClientStatus.Parser, new[]{ "Stats" }, null, null, null), new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.Mark), global::Grpc.Testing.Mark.Parser, new[]{ "Reset" }, null, null, null), new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ClientArgs), global::Grpc.Testing.ClientArgs.Parser, new[]{ "Setup", "Mark" }, new[]{ "Argtype" }, null, null), - new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ServerConfig), global::Grpc.Testing.ServerConfig.Parser, new[]{ "ServerType", "SecurityParams", "Port", "AsyncServerThreads", "CoreLimit", "PayloadConfig", "CoreList" }, null, null, null), + new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ServerConfig), global::Grpc.Testing.ServerConfig.Parser, new[]{ "ServerType", "SecurityParams", "Port", "AsyncServerThreads", "CoreLimit", "PayloadConfig", "CoreList", "OtherServerApi" }, null, null, null), new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ServerArgs), global::Grpc.Testing.ServerArgs.Parser, new[]{ "Setup", "Mark" }, new[]{ "Argtype" }, null, null), new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.ServerStatus), global::Grpc.Testing.ServerStatus.Parser, new[]{ "Stats", "Port", "Cores" }, null, null, null), new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.CoreRequest), global::Grpc.Testing.CoreRequest.Parser, null, null, null, null), @@ -109,14 +111,26 @@ namespace Grpc.Testing { } #region Enums public enum ClientType { + /// <summary> + /// Many languages support a basic distinction between using + /// sync or async client, and this allows the specification + /// </summary> SYNC_CLIENT = 0, ASYNC_CLIENT = 1, + /// <summary> + /// used for some language-specific variants + /// </summary> + OTHER_CLIENT = 2, } public enum ServerType { SYNC_SERVER = 0, ASYNC_SERVER = 1, ASYNC_GENERIC_SERVER = 2, + /// <summary> + /// used for some language-specific variants + /// </summary> + OTHER_SERVER = 3, } public enum RpcType { @@ -651,6 +665,7 @@ namespace Grpc.Testing { HistogramParams = other.histogramParams_ != null ? other.HistogramParams.Clone() : null; coreList_ = other.coreList_.Clone(); coreLimit_ = other.coreLimit_; + otherClientApi_ = other.otherClientApi_; } public ClientConfig Clone() { @@ -795,6 +810,19 @@ namespace Grpc.Testing { } } + /// <summary>Field number for the "other_client_api" field.</summary> + public const int OtherClientApiFieldNumber = 15; + private string otherClientApi_ = ""; + /// <summary> + /// If we use an OTHER_CLIENT client_type, this string gives more detail + /// </summary> + public string OtherClientApi { + get { return otherClientApi_; } + set { + otherClientApi_ = pb::Preconditions.CheckNotNull(value, "value"); + } + } + public override bool Equals(object other) { return Equals(other as ClientConfig); } @@ -818,6 +846,7 @@ namespace Grpc.Testing { if (!object.Equals(HistogramParams, other.HistogramParams)) return false; if(!coreList_.Equals(other.coreList_)) return false; if (CoreLimit != other.CoreLimit) return false; + if (OtherClientApi != other.OtherClientApi) return false; return true; } @@ -835,6 +864,7 @@ namespace Grpc.Testing { if (histogramParams_ != null) hash ^= HistogramParams.GetHashCode(); hash ^= coreList_.GetHashCode(); if (CoreLimit != 0) hash ^= CoreLimit.GetHashCode(); + if (OtherClientApi.Length != 0) hash ^= OtherClientApi.GetHashCode(); return hash; } @@ -885,6 +915,10 @@ namespace Grpc.Testing { output.WriteRawTag(112); output.WriteInt32(CoreLimit); } + if (OtherClientApi.Length != 0) { + output.WriteRawTag(122); + output.WriteString(OtherClientApi); + } } public int CalculateSize() { @@ -921,6 +955,9 @@ namespace Grpc.Testing { if (CoreLimit != 0) { size += 1 + pb::CodedOutputStream.ComputeInt32Size(CoreLimit); } + if (OtherClientApi.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(OtherClientApi); + } return size; } @@ -972,6 +1009,9 @@ namespace Grpc.Testing { if (other.CoreLimit != 0) { CoreLimit = other.CoreLimit; } + if (other.OtherClientApi.Length != 0) { + OtherClientApi = other.OtherClientApi; + } } public void MergeFrom(pb::CodedInputStream input) { @@ -1042,6 +1082,10 @@ namespace Grpc.Testing { CoreLimit = input.ReadInt32(); break; } + case 122: { + OtherClientApi = input.ReadString(); + break; + } } } } @@ -1462,6 +1506,7 @@ namespace Grpc.Testing { coreLimit_ = other.coreLimit_; PayloadConfig = other.payloadConfig_ != null ? other.PayloadConfig.Clone() : null; coreList_ = other.coreList_.Clone(); + otherServerApi_ = other.otherServerApi_; } public ServerConfig Clone() { @@ -1552,6 +1597,19 @@ namespace Grpc.Testing { get { return coreList_; } } + /// <summary>Field number for the "other_server_api" field.</summary> + public const int OtherServerApiFieldNumber = 11; + private string otherServerApi_ = ""; + /// <summary> + /// If we use an OTHER_SERVER client_type, this string gives more detail + /// </summary> + public string OtherServerApi { + get { return otherServerApi_; } + set { + otherServerApi_ = pb::Preconditions.CheckNotNull(value, "value"); + } + } + public override bool Equals(object other) { return Equals(other as ServerConfig); } @@ -1570,6 +1628,7 @@ namespace Grpc.Testing { if (CoreLimit != other.CoreLimit) return false; if (!object.Equals(PayloadConfig, other.PayloadConfig)) return false; if(!coreList_.Equals(other.coreList_)) return false; + if (OtherServerApi != other.OtherServerApi) return false; return true; } @@ -1582,6 +1641,7 @@ namespace Grpc.Testing { if (CoreLimit != 0) hash ^= CoreLimit.GetHashCode(); if (payloadConfig_ != null) hash ^= PayloadConfig.GetHashCode(); hash ^= coreList_.GetHashCode(); + if (OtherServerApi.Length != 0) hash ^= OtherServerApi.GetHashCode(); return hash; } @@ -1615,6 +1675,10 @@ namespace Grpc.Testing { output.WriteMessage(PayloadConfig); } coreList_.WriteTo(output, _repeated_coreList_codec); + if (OtherServerApi.Length != 0) { + output.WriteRawTag(90); + output.WriteString(OtherServerApi); + } } public int CalculateSize() { @@ -1638,6 +1702,9 @@ namespace Grpc.Testing { size += 1 + pb::CodedOutputStream.ComputeMessageSize(PayloadConfig); } size += coreList_.CalculateSize(_repeated_coreList_codec); + if (OtherServerApi.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(OtherServerApi); + } return size; } @@ -1670,6 +1737,9 @@ namespace Grpc.Testing { PayloadConfig.MergeFrom(other.PayloadConfig); } coreList_.Add(other.coreList_); + if (other.OtherServerApi.Length != 0) { + OtherServerApi = other.OtherServerApi; + } } public void MergeFrom(pb::CodedInputStream input) { @@ -1714,6 +1784,10 @@ namespace Grpc.Testing { coreList_.AddEntriesFrom(input, _repeated_coreList_codec); break; } + case 90: { + OtherServerApi = input.ReadString(); + break; + } } } } diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index c16d0e5c5d..9685cf1837 100644 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -113,6 +113,9 @@ <Compile Include="GeneratedClientTest.cs" /> <Compile Include="InterarrivalTimers.cs" /> <Compile Include="NUnitMain.cs" /> + <Compile Include="StressTestClient.cs" /> + <Compile Include="Metrics.cs" /> + <Compile Include="MetricsGrpc.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup> diff --git a/src/csharp/Grpc.IntegrationTesting/Metrics.cs b/src/csharp/Grpc.IntegrationTesting/Metrics.cs new file mode 100644 index 0000000000..3163949d32 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/Metrics.cs @@ -0,0 +1,452 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: src/proto/grpc/testing/metrics.proto +#pragma warning disable 1591, 0612, 3021 +#region Designer generated code + +using pb = global::Google.Protobuf; +using pbc = global::Google.Protobuf.Collections; +using pbr = global::Google.Protobuf.Reflection; +using scg = global::System.Collections.Generic; +namespace Grpc.Testing { + + /// <summary>Holder for reflection information generated from src/proto/grpc/testing/metrics.proto</summary> + [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] + public static partial class MetricsReflection { + + #region Descriptor + /// <summary>File descriptor for src/proto/grpc/testing/metrics.proto</summary> + public static pbr::FileDescriptor Descriptor { + get { return descriptor; } + } + private static pbr::FileDescriptor descriptor; + + static MetricsReflection() { + byte[] descriptorData = global::System.Convert.FromBase64String( + string.Concat( + "CiRzcmMvcHJvdG8vZ3JwYy90ZXN0aW5nL21ldHJpY3MucHJvdG8SDGdycGMu", + "dGVzdGluZyJsCg1HYXVnZVJlc3BvbnNlEgwKBG5hbWUYASABKAkSFAoKbG9u", + "Z192YWx1ZRgCIAEoA0gAEhYKDGRvdWJsZV92YWx1ZRgDIAEoAUgAEhYKDHN0", + "cmluZ192YWx1ZRgEIAEoCUgAQgcKBXZhbHVlIhwKDEdhdWdlUmVxdWVzdBIM", + "CgRuYW1lGAEgASgJIg4KDEVtcHR5TWVzc2FnZTKgAQoOTWV0cmljc1NlcnZp", + "Y2USSQoMR2V0QWxsR2F1Z2VzEhouZ3JwYy50ZXN0aW5nLkVtcHR5TWVzc2Fn", + "ZRobLmdycGMudGVzdGluZy5HYXVnZVJlc3BvbnNlMAESQwoIR2V0R2F1Z2US", + "Gi5ncnBjLnRlc3RpbmcuR2F1Z2VSZXF1ZXN0GhsuZ3JwYy50ZXN0aW5nLkdh", + "dWdlUmVzcG9uc2ViBnByb3RvMw==")); + descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, + new pbr::FileDescriptor[] { }, + new pbr::GeneratedCodeInfo(null, new pbr::GeneratedCodeInfo[] { + new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.GaugeResponse), global::Grpc.Testing.GaugeResponse.Parser, new[]{ "Name", "LongValue", "DoubleValue", "StringValue" }, new[]{ "Value" }, null, null), + new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.GaugeRequest), global::Grpc.Testing.GaugeRequest.Parser, new[]{ "Name" }, null, null, null), + new pbr::GeneratedCodeInfo(typeof(global::Grpc.Testing.EmptyMessage), global::Grpc.Testing.EmptyMessage.Parser, null, null, null, null) + })); + } + #endregion + + } + #region Messages + /// <summary> + /// Reponse message containing the gauge name and value + /// </summary> + [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] + public sealed partial class GaugeResponse : pb::IMessage<GaugeResponse> { + private static readonly pb::MessageParser<GaugeResponse> _parser = new pb::MessageParser<GaugeResponse>(() => new GaugeResponse()); + public static pb::MessageParser<GaugeResponse> Parser { get { return _parser; } } + + public static pbr::MessageDescriptor Descriptor { + get { return global::Grpc.Testing.MetricsReflection.Descriptor.MessageTypes[0]; } + } + + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + public GaugeResponse() { + OnConstruction(); + } + + partial void OnConstruction(); + + public GaugeResponse(GaugeResponse other) : this() { + name_ = other.name_; + switch (other.ValueCase) { + case ValueOneofCase.LongValue: + LongValue = other.LongValue; + break; + case ValueOneofCase.DoubleValue: + DoubleValue = other.DoubleValue; + break; + case ValueOneofCase.StringValue: + StringValue = other.StringValue; + break; + } + + } + + public GaugeResponse Clone() { + return new GaugeResponse(this); + } + + /// <summary>Field number for the "name" field.</summary> + public const int NameFieldNumber = 1; + private string name_ = ""; + public string Name { + get { return name_; } + set { + name_ = pb::Preconditions.CheckNotNull(value, "value"); + } + } + + /// <summary>Field number for the "long_value" field.</summary> + public const int LongValueFieldNumber = 2; + public long LongValue { + get { return valueCase_ == ValueOneofCase.LongValue ? (long) value_ : 0L; } + set { + value_ = value; + valueCase_ = ValueOneofCase.LongValue; + } + } + + /// <summary>Field number for the "double_value" field.</summary> + public const int DoubleValueFieldNumber = 3; + public double DoubleValue { + get { return valueCase_ == ValueOneofCase.DoubleValue ? (double) value_ : 0D; } + set { + value_ = value; + valueCase_ = ValueOneofCase.DoubleValue; + } + } + + /// <summary>Field number for the "string_value" field.</summary> + public const int StringValueFieldNumber = 4; + public string StringValue { + get { return valueCase_ == ValueOneofCase.StringValue ? (string) value_ : ""; } + set { + value_ = pb::Preconditions.CheckNotNull(value, "value"); + valueCase_ = ValueOneofCase.StringValue; + } + } + + private object value_; + /// <summary>Enum of possible cases for the "value" oneof.</summary> + public enum ValueOneofCase { + None = 0, + LongValue = 2, + DoubleValue = 3, + StringValue = 4, + } + private ValueOneofCase valueCase_ = ValueOneofCase.None; + public ValueOneofCase ValueCase { + get { return valueCase_; } + } + + public void ClearValue() { + valueCase_ = ValueOneofCase.None; + value_ = null; + } + + public override bool Equals(object other) { + return Equals(other as GaugeResponse); + } + + public bool Equals(GaugeResponse other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Name != other.Name) return false; + if (LongValue != other.LongValue) return false; + if (DoubleValue != other.DoubleValue) return false; + if (StringValue != other.StringValue) return false; + if (ValueCase != other.ValueCase) return false; + return true; + } + + public override int GetHashCode() { + int hash = 1; + if (Name.Length != 0) hash ^= Name.GetHashCode(); + if (valueCase_ == ValueOneofCase.LongValue) hash ^= LongValue.GetHashCode(); + if (valueCase_ == ValueOneofCase.DoubleValue) hash ^= DoubleValue.GetHashCode(); + if (valueCase_ == ValueOneofCase.StringValue) hash ^= StringValue.GetHashCode(); + hash ^= (int) valueCase_; + return hash; + } + + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + public void WriteTo(pb::CodedOutputStream output) { + if (Name.Length != 0) { + output.WriteRawTag(10); + output.WriteString(Name); + } + if (valueCase_ == ValueOneofCase.LongValue) { + output.WriteRawTag(16); + output.WriteInt64(LongValue); + } + if (valueCase_ == ValueOneofCase.DoubleValue) { + output.WriteRawTag(25); + output.WriteDouble(DoubleValue); + } + if (valueCase_ == ValueOneofCase.StringValue) { + output.WriteRawTag(34); + output.WriteString(StringValue); + } + } + + public int CalculateSize() { + int size = 0; + if (Name.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(Name); + } + if (valueCase_ == ValueOneofCase.LongValue) { + size += 1 + pb::CodedOutputStream.ComputeInt64Size(LongValue); + } + if (valueCase_ == ValueOneofCase.DoubleValue) { + size += 1 + 8; + } + if (valueCase_ == ValueOneofCase.StringValue) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(StringValue); + } + return size; + } + + public void MergeFrom(GaugeResponse other) { + if (other == null) { + return; + } + if (other.Name.Length != 0) { + Name = other.Name; + } + switch (other.ValueCase) { + case ValueOneofCase.LongValue: + LongValue = other.LongValue; + break; + case ValueOneofCase.DoubleValue: + DoubleValue = other.DoubleValue; + break; + case ValueOneofCase.StringValue: + StringValue = other.StringValue; + break; + } + + } + + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + Name = input.ReadString(); + break; + } + case 16: { + LongValue = input.ReadInt64(); + break; + } + case 25: { + DoubleValue = input.ReadDouble(); + break; + } + case 34: { + StringValue = input.ReadString(); + break; + } + } + } + } + + } + + /// <summary> + /// Request message containing the gauge name + /// </summary> + [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] + public sealed partial class GaugeRequest : pb::IMessage<GaugeRequest> { + private static readonly pb::MessageParser<GaugeRequest> _parser = new pb::MessageParser<GaugeRequest>(() => new GaugeRequest()); + public static pb::MessageParser<GaugeRequest> Parser { get { return _parser; } } + + public static pbr::MessageDescriptor Descriptor { + get { return global::Grpc.Testing.MetricsReflection.Descriptor.MessageTypes[1]; } + } + + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + public GaugeRequest() { + OnConstruction(); + } + + partial void OnConstruction(); + + public GaugeRequest(GaugeRequest other) : this() { + name_ = other.name_; + } + + public GaugeRequest Clone() { + return new GaugeRequest(this); + } + + /// <summary>Field number for the "name" field.</summary> + public const int NameFieldNumber = 1; + private string name_ = ""; + public string Name { + get { return name_; } + set { + name_ = pb::Preconditions.CheckNotNull(value, "value"); + } + } + + public override bool Equals(object other) { + return Equals(other as GaugeRequest); + } + + public bool Equals(GaugeRequest other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + if (Name != other.Name) return false; + return true; + } + + public override int GetHashCode() { + int hash = 1; + if (Name.Length != 0) hash ^= Name.GetHashCode(); + return hash; + } + + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + public void WriteTo(pb::CodedOutputStream output) { + if (Name.Length != 0) { + output.WriteRawTag(10); + output.WriteString(Name); + } + } + + public int CalculateSize() { + int size = 0; + if (Name.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeStringSize(Name); + } + return size; + } + + public void MergeFrom(GaugeRequest other) { + if (other == null) { + return; + } + if (other.Name.Length != 0) { + Name = other.Name; + } + } + + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + case 10: { + Name = input.ReadString(); + break; + } + } + } + } + + } + + [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] + public sealed partial class EmptyMessage : pb::IMessage<EmptyMessage> { + private static readonly pb::MessageParser<EmptyMessage> _parser = new pb::MessageParser<EmptyMessage>(() => new EmptyMessage()); + public static pb::MessageParser<EmptyMessage> Parser { get { return _parser; } } + + public static pbr::MessageDescriptor Descriptor { + get { return global::Grpc.Testing.MetricsReflection.Descriptor.MessageTypes[2]; } + } + + pbr::MessageDescriptor pb::IMessage.Descriptor { + get { return Descriptor; } + } + + public EmptyMessage() { + OnConstruction(); + } + + partial void OnConstruction(); + + public EmptyMessage(EmptyMessage other) : this() { + } + + public EmptyMessage Clone() { + return new EmptyMessage(this); + } + + public override bool Equals(object other) { + return Equals(other as EmptyMessage); + } + + public bool Equals(EmptyMessage other) { + if (ReferenceEquals(other, null)) { + return false; + } + if (ReferenceEquals(other, this)) { + return true; + } + return true; + } + + public override int GetHashCode() { + int hash = 1; + return hash; + } + + public override string ToString() { + return pb::JsonFormatter.ToDiagnosticString(this); + } + + public void WriteTo(pb::CodedOutputStream output) { + } + + public int CalculateSize() { + int size = 0; + return size; + } + + public void MergeFrom(EmptyMessage other) { + if (other == null) { + return; + } + } + + public void MergeFrom(pb::CodedInputStream input) { + uint tag; + while ((tag = input.ReadTag()) != 0) { + switch(tag) { + default: + input.SkipLastField(); + break; + } + } + } + + } + + #endregion + +} + +#endregion Designer generated code diff --git a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs new file mode 100644 index 0000000000..cc01ae91a1 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs @@ -0,0 +1,146 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: src/proto/grpc/testing/metrics.proto +#region Designer generated code + +using System; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; + +namespace Grpc.Testing { + public static class MetricsService + { + static readonly string __ServiceName = "grpc.testing.MetricsService"; + + static readonly Marshaller<global::Grpc.Testing.EmptyMessage> __Marshaller_EmptyMessage = Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.EmptyMessage.Parser.ParseFrom); + static readonly Marshaller<global::Grpc.Testing.GaugeResponse> __Marshaller_GaugeResponse = Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.GaugeResponse.Parser.ParseFrom); + static readonly Marshaller<global::Grpc.Testing.GaugeRequest> __Marshaller_GaugeRequest = Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.GaugeRequest.Parser.ParseFrom); + + static readonly Method<global::Grpc.Testing.EmptyMessage, global::Grpc.Testing.GaugeResponse> __Method_GetAllGauges = new Method<global::Grpc.Testing.EmptyMessage, global::Grpc.Testing.GaugeResponse>( + MethodType.ServerStreaming, + __ServiceName, + "GetAllGauges", + __Marshaller_EmptyMessage, + __Marshaller_GaugeResponse); + + static readonly Method<global::Grpc.Testing.GaugeRequest, global::Grpc.Testing.GaugeResponse> __Method_GetGauge = new Method<global::Grpc.Testing.GaugeRequest, global::Grpc.Testing.GaugeResponse>( + MethodType.Unary, + __ServiceName, + "GetGauge", + __Marshaller_GaugeRequest, + __Marshaller_GaugeResponse); + + // service descriptor + public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor + { + get { return global::Grpc.Testing.MetricsReflection.Descriptor.Services[0]; } + } + + // client interface + [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")] + public interface IMetricsServiceClient + { + AsyncServerStreamingCall<global::Grpc.Testing.GaugeResponse> GetAllGauges(global::Grpc.Testing.EmptyMessage request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncServerStreamingCall<global::Grpc.Testing.GaugeResponse> GetAllGauges(global::Grpc.Testing.EmptyMessage request, CallOptions options); + global::Grpc.Testing.GaugeResponse GetGauge(global::Grpc.Testing.GaugeRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + global::Grpc.Testing.GaugeResponse GetGauge(global::Grpc.Testing.GaugeRequest request, CallOptions options); + AsyncUnaryCall<global::Grpc.Testing.GaugeResponse> GetGaugeAsync(global::Grpc.Testing.GaugeRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)); + AsyncUnaryCall<global::Grpc.Testing.GaugeResponse> GetGaugeAsync(global::Grpc.Testing.GaugeRequest request, CallOptions options); + } + + // server-side interface + [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")] + public interface IMetricsService + { + Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context); + Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context); + } + + // server-side abstract class + public abstract class MetricsServiceBase + { + public virtual Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context) + { + throw new RpcException(new Status(StatusCode.Unimplemented, "")); + } + + public virtual Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context) + { + throw new RpcException(new Status(StatusCode.Unimplemented, "")); + } + + } + + // client stub + public class MetricsServiceClient : ClientBase<MetricsServiceClient>, IMetricsServiceClient + { + public MetricsServiceClient(Channel channel) : base(channel) + { + } + public MetricsServiceClient(CallInvoker callInvoker) : base(callInvoker) + { + } + ///<summary>Protected parameterless constructor to allow creation of test doubles.</summary> + protected MetricsServiceClient() : base() + { + } + ///<summary>Protected constructor to allow creation of configured clients.</summary> + protected MetricsServiceClient(ClientBaseConfiguration configuration) : base(configuration) + { + } + + public virtual AsyncServerStreamingCall<global::Grpc.Testing.GaugeResponse> GetAllGauges(global::Grpc.Testing.EmptyMessage request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + { + return GetAllGauges(request, new CallOptions(headers, deadline, cancellationToken)); + } + public virtual AsyncServerStreamingCall<global::Grpc.Testing.GaugeResponse> GetAllGauges(global::Grpc.Testing.EmptyMessage request, CallOptions options) + { + return CallInvoker.AsyncServerStreamingCall(__Method_GetAllGauges, null, options, request); + } + public virtual global::Grpc.Testing.GaugeResponse GetGauge(global::Grpc.Testing.GaugeRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + { + return GetGauge(request, new CallOptions(headers, deadline, cancellationToken)); + } + public virtual global::Grpc.Testing.GaugeResponse GetGauge(global::Grpc.Testing.GaugeRequest request, CallOptions options) + { + return CallInvoker.BlockingUnaryCall(__Method_GetGauge, null, options, request); + } + public virtual AsyncUnaryCall<global::Grpc.Testing.GaugeResponse> GetGaugeAsync(global::Grpc.Testing.GaugeRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken)) + { + return GetGaugeAsync(request, new CallOptions(headers, deadline, cancellationToken)); + } + public virtual AsyncUnaryCall<global::Grpc.Testing.GaugeResponse> GetGaugeAsync(global::Grpc.Testing.GaugeRequest request, CallOptions options) + { + return CallInvoker.AsyncUnaryCall(__Method_GetGauge, null, options, request); + } + protected override MetricsServiceClient NewInstance(ClientBaseConfiguration configuration) + { + return new MetricsServiceClient(configuration); + } + } + + // creates service definition that can be registered with a server + public static ServerServiceDefinition BindService(IMetricsService serviceImpl) + { + return ServerServiceDefinition.CreateBuilder(__ServiceName) + .AddMethod(__Method_GetAllGauges, serviceImpl.GetAllGauges) + .AddMethod(__Method_GetGauge, serviceImpl.GetGauge).Build(); + } + + // creates service definition that can be registered with a server + public static ServerServiceDefinition BindService(MetricsServiceBase serviceImpl) + { + return ServerServiceDefinition.CreateBuilder(__ServiceName) + .AddMethod(__Method_GetAllGauges, serviceImpl.GetAllGauges) + .AddMethod(__Method_GetGauge, serviceImpl.GetGauge).Build(); + } + + // creates a new client + public static MetricsServiceClient NewClient(Channel channel) + { + return new MetricsServiceClient(channel); + } + + } +} +#endregion diff --git a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs new file mode 100644 index 0000000000..8db691cb04 --- /dev/null +++ b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs @@ -0,0 +1,318 @@ +#region Copyright notice and license + +// Copyright 2015-2016, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#endregion + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using CommandLine; +using CommandLine.Text; +using Grpc.Core; +using Grpc.Core.Logging; +using Grpc.Core.Utils; +using Grpc.Testing; + +namespace Grpc.IntegrationTesting +{ + public class StressTestClient + { + static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<StressTestClient>(); + const double SecondsToNanos = 1e9; + + private class ClientOptions + { + [Option("server_addresses", DefaultValue = "localhost:8080")] + public string ServerAddresses { get; set; } + + [Option("test_cases", DefaultValue = "large_unary:100")] + public string TestCases { get; set; } + + [Option("test_duration_secs", DefaultValue = -1)] + public int TestDurationSecs { get; set; } + + [Option("num_channels_per_server", DefaultValue = 1)] + public int NumChannelsPerServer { get; set; } + + [Option("num_stubs_per_channel", DefaultValue = 1)] + public int NumStubsPerChannel { get; set; } + + [Option("metrics_port", DefaultValue = 8081)] + public int MetricsPort { get; set; } + + [HelpOption] + public string GetUsage() + { + var help = new HelpText + { + Heading = "gRPC C# stress test client", + AddDashesToOption = true + }; + help.AddPreOptionsLine("Usage:"); + help.AddOptions(this); + return help; + } + } + + ClientOptions options; + List<string> serverAddresses; + Dictionary<string, int> weightedTestCases; + WeightedRandomGenerator testCaseGenerator; + + // cancellation will be emitted once test_duration_secs has elapsed. + CancellationTokenSource finishedTokenSource = new CancellationTokenSource(); + Histogram histogram = new Histogram(0.01, 60 * SecondsToNanos); + + private StressTestClient(ClientOptions options, List<string> serverAddresses, Dictionary<string, int> weightedTestCases) + { + this.options = options; + this.serverAddresses = serverAddresses; + this.weightedTestCases = weightedTestCases; + this.testCaseGenerator = new WeightedRandomGenerator(this.weightedTestCases); + } + + public static void Run(string[] args) + { + var options = new ClientOptions(); + if (!Parser.Default.ParseArguments(args, options)) + { + Environment.Exit(1); + } + + GrpcPreconditions.CheckArgument(options.NumChannelsPerServer > 0); + GrpcPreconditions.CheckArgument(options.NumStubsPerChannel > 0); + + var serverAddresses = options.ServerAddresses.Split(','); + GrpcPreconditions.CheckArgument(serverAddresses.Length > 0, "You need to provide at least one server address"); + + var testCases = ParseWeightedTestCases(options.TestCases); + GrpcPreconditions.CheckArgument(testCases.Count > 0, "You need to provide at least one test case"); + + var interopClient = new StressTestClient(options, serverAddresses.ToList(), testCases); + interopClient.Run().Wait(); + } + + async Task Run() + { + var metricsServer = new Server() + { + Services = { MetricsService.BindService(new MetricsServiceImpl(histogram)) }, + Ports = { { "[::]", options.MetricsPort, ServerCredentials.Insecure } } + }; + metricsServer.Start(); + + if (options.TestDurationSecs >= 0) + { + finishedTokenSource.CancelAfter(TimeSpan.FromSeconds(options.TestDurationSecs)); + } + + var tasks = new List<Task>(); + var channels = new List<Channel>(); + foreach (var serverAddress in serverAddresses) + { + for (int i = 0; i < options.NumChannelsPerServer; i++) + { + var channel = new Channel(serverAddress, ChannelCredentials.Insecure); + channels.Add(channel); + for (int j = 0; j < options.NumStubsPerChannel; j++) + { + var client = TestService.NewClient(channel); + var task = Task.Factory.StartNew(() => RunBodyAsync(client).GetAwaiter().GetResult(), + TaskCreationOptions.LongRunning); + tasks.Add(task); + } + } + } + await Task.WhenAll(tasks); + + foreach (var channel in channels) + { + await channel.ShutdownAsync(); + } + + await metricsServer.ShutdownAsync(); + } + + async Task RunBodyAsync(TestService.TestServiceClient client) + { + Logger.Info("Starting stress test client thread."); + while (!finishedTokenSource.Token.IsCancellationRequested) + { + var testCase = testCaseGenerator.GetNext(); + + var stopwatch = Stopwatch.StartNew(); + + await RunTestCaseAsync(client, testCase); + + stopwatch.Stop(); + histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); + } + Logger.Info("Stress test client thread finished."); + } + + async Task RunTestCaseAsync(TestService.TestServiceClient client, string testCase) + { + switch (testCase) + { + case "empty_unary": + InteropClient.RunEmptyUnary(client); + break; + case "large_unary": + InteropClient.RunLargeUnary(client); + break; + case "client_streaming": + await InteropClient.RunClientStreamingAsync(client); + break; + case "server_streaming": + await InteropClient.RunServerStreamingAsync(client); + break; + case "ping_pong": + await InteropClient.RunPingPongAsync(client); + break; + case "empty_stream": + await InteropClient.RunEmptyStreamAsync(client); + break; + case "cancel_after_begin": + await InteropClient.RunCancelAfterBeginAsync(client); + break; + case "cancel_after_first_response": + await InteropClient.RunCancelAfterFirstResponseAsync(client); + break; + case "timeout_on_sleeping_server": + await InteropClient.RunTimeoutOnSleepingServerAsync(client); + break; + case "custom_metadata": + await InteropClient.RunCustomMetadataAsync(client); + break; + case "status_code_and_message": + await InteropClient.RunStatusCodeAndMessageAsync(client); + break; + default: + throw new ArgumentException("Unsupported test case " + testCase); + } + } + + static Dictionary<string, int> ParseWeightedTestCases(string weightedTestCases) + { + var result = new Dictionary<string, int>(); + foreach (var weightedTestCase in weightedTestCases.Split(',')) + { + var parts = weightedTestCase.Split(new char[] {':'}, 2); + GrpcPreconditions.CheckArgument(parts.Length == 2, "Malformed test_cases option."); + result.Add(parts[0], int.Parse(parts[1])); + } + return result; + } + + class WeightedRandomGenerator + { + readonly Random random = new Random(); + readonly List<Tuple<int, string>> cumulativeSums; + readonly int weightSum; + + public WeightedRandomGenerator(Dictionary<string, int> weightedItems) + { + cumulativeSums = new List<Tuple<int, string>>(); + weightSum = 0; + foreach (var entry in weightedItems) + { + weightSum += entry.Value; + cumulativeSums.Add(Tuple.Create(weightSum, entry.Key)); + } + } + + public string GetNext() + { + int rand = random.Next(weightSum); + foreach (var entry in cumulativeSums) + { + if (rand < entry.Item1) + { + return entry.Item2; + } + } + throw new InvalidOperationException("GetNext() failed."); + } + } + + class MetricsServiceImpl : MetricsService.MetricsServiceBase + { + const string GaugeName = "csharp_overall_qps"; + + readonly Histogram histogram; + readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); + + public MetricsServiceImpl(Histogram histogram) + { + this.histogram = histogram; + } + + public override Task<GaugeResponse> GetGauge(GaugeRequest request, ServerCallContext context) + { + if (request.Name == GaugeName) + { + long qps = GetQpsAndReset(); + + return Task.FromResult(new GaugeResponse + { + Name = GaugeName, + LongValue = qps + }); + } + throw new RpcException(new Status(StatusCode.InvalidArgument, "Gauge does not exist")); + } + + public override async Task GetAllGauges(EmptyMessage request, IServerStreamWriter<GaugeResponse> responseStream, ServerCallContext context) + { + long qps = GetQpsAndReset(); + + var response = new GaugeResponse + { + Name = GaugeName, + LongValue = qps + }; + await responseStream.WriteAsync(response); + } + + long GetQpsAndReset() + { + var snapshot = histogram.GetSnapshot(true); + var elapsedSnapshot = wallClockStopwatch.GetElapsedSnapshot(true); + + return (long) (snapshot.Count / elapsedSnapshot.Seconds); + } + } + } +} diff --git a/src/csharp/Grpc.sln b/src/csharp/Grpc.sln index 8ff35e8c0d..9be36c0caa 100644 --- a/src/csharp/Grpc.sln +++ b/src/csharp/Grpc.sln @@ -34,6 +34,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Grpc.HealthCheck.Tests", "G EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Grpc.IntegrationTesting.QpsWorker", "Grpc.IntegrationTesting.QpsWorker\Grpc.IntegrationTesting.QpsWorker.csproj", "{B82B7DFE-7F7B-40EF-B3D6-064FF2B01294}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Grpc.IntegrationTesting.StressClient", "Grpc.IntegrationTesting.StressClient\Grpc.IntegrationTesting.StressClient.csproj", "{ADEBA147-80AE-4710-82E9-5B7F93690266}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -83,6 +85,12 @@ Global {AA5E328A-8835-49D7-98ED-C29F2B3049F0}.Release|Any CPU.Build.0 = Release|Any CPU
{AA5E328A-8835-49D7-98ED-C29F2B3049F0}.ReleaseSigned|Any CPU.ActiveCfg = ReleaseSigned|Any CPU
{AA5E328A-8835-49D7-98ED-C29F2B3049F0}.ReleaseSigned|Any CPU.Build.0 = ReleaseSigned|Any CPU
+ {ADEBA147-80AE-4710-82E9-5B7F93690266}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {ADEBA147-80AE-4710-82E9-5B7F93690266}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {ADEBA147-80AE-4710-82E9-5B7F93690266}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {ADEBA147-80AE-4710-82E9-5B7F93690266}.Release|Any CPU.Build.0 = Release|Any CPU
+ {ADEBA147-80AE-4710-82E9-5B7F93690266}.ReleaseSigned|Any CPU.ActiveCfg = Release|Any CPU
+ {ADEBA147-80AE-4710-82E9-5B7F93690266}.ReleaseSigned|Any CPU.Build.0 = Release|Any CPU
{AE21D0EE-9A2C-4C15-AB7F-5224EED5B0EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AE21D0EE-9A2C-4C15-AB7F-5224EED5B0EA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AE21D0EE-9A2C-4C15-AB7F-5224EED5B0EA}.Release|Any CPU.ActiveCfg = Release|Any CPU
diff --git a/src/csharp/generate_proto_csharp.sh b/src/csharp/generate_proto_csharp.sh index 9ac770b79d..79488e02a5 100755 --- a/src/csharp/generate_proto_csharp.sh +++ b/src/csharp/generate_proto_csharp.sh @@ -45,4 +45,4 @@ $PROTOC --plugin=$PLUGIN --csharp_out=$HEALTHCHECK_DIR --grpc_out=$HEALTHCHECK_D -I src/proto/grpc/health/v1 src/proto/grpc/health/v1/health.proto $PROTOC --plugin=$PLUGIN --csharp_out=$TESTING_DIR --grpc_out=$TESTING_DIR \ - -I . src/proto/grpc/testing/{control,empty,messages,payloads,services,stats,test}.proto + -I . src/proto/grpc/testing/{control,empty,messages,metrics,payloads,services,stats,test}.proto diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 28769ef653..20496a8116 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -35,14 +35,18 @@ import "src/proto/grpc/testing/stats.proto"; package grpc.testing; enum ClientType { + // Many languages support a basic distinction between using + // sync or async client, and this allows the specification SYNC_CLIENT = 0; ASYNC_CLIENT = 1; + OTHER_CLIENT = 2; // used for some language-specific variants } enum ServerType { SYNC_SERVER = 0; ASYNC_SERVER = 1; ASYNC_GENERIC_SERVER = 2; + OTHER_SERVER = 3; // used for some language-specific variants } enum RpcType { @@ -96,6 +100,9 @@ message ClientConfig { // Specify the cores we should run the client on, if desired repeated int32 core_list = 13; int32 core_limit = 14; + + // If we use an OTHER_CLIENT client_type, this string gives more detail + string other_client_api = 15; } message ClientStatus { ClientStats stats = 1; } @@ -127,6 +134,9 @@ message ServerConfig { // Specify the cores we should run the server on, if desired repeated int32 core_list = 10; + + // If we use an OTHER_SERVER client_type, this string gives more detail + string other_server_api = 11; } message ServerArgs { diff --git a/src/ruby/qps/src/proto/grpc/testing/control.rb b/src/ruby/qps/src/proto/grpc/testing/control.rb index b81e22659d..958fca320b 100644 --- a/src/ruby/qps/src/proto/grpc/testing/control.rb +++ b/src/ruby/qps/src/proto/grpc/testing/control.rb @@ -34,6 +34,7 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :histogram_params, :message, 12, "grpc.testing.HistogramParams" repeated :core_list, :int32, 13 optional :core_limit, :int32, 14 + optional :other_client_api, :string, 15 end add_message "grpc.testing.ClientStatus" do optional :stats, :message, 1, "grpc.testing.ClientStats" @@ -55,6 +56,7 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :core_limit, :int32, 8 optional :payload_config, :message, 9, "grpc.testing.PayloadConfig" repeated :core_list, :int32, 10 + optional :other_server_api, :string, 11 end add_message "grpc.testing.ServerArgs" do oneof :argtype do @@ -111,11 +113,13 @@ Google::Protobuf::DescriptorPool.generated_pool.build do add_enum "grpc.testing.ClientType" do value :SYNC_CLIENT, 0 value :ASYNC_CLIENT, 1 + value :OTHER_CLIENT, 2 end add_enum "grpc.testing.ServerType" do value :SYNC_SERVER, 0 value :ASYNC_SERVER, 1 value :ASYNC_GENERIC_SERVER, 2 + value :OTHER_SERVER, 3 end add_enum "grpc.testing.RpcType" do value :UNARY, 0 |