diff options
author | Craig Tiller <ctiller@google.com> | 2016-10-28 07:22:08 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-10-28 07:22:08 -0700 |
commit | b7982319c9e43a4219a1517dec97b1c6876b439d (patch) | |
tree | a14da6953b81aa91c4ac6dd3e1615ea54807a29a | |
parent | 28b72428a84ae7d23c106fb282478fa94951c9c0 (diff) | |
parent | 6628e36a89b3ec709477b7ff494ec5e237ac8231 (diff) |
Merge github.com:grpc/grpc into grpc_slice
30 files changed, 833 insertions, 202 deletions
diff --git a/package.json b/package.json index 9afba31816..c6b2600209 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,8 @@ }, "devDependencies": { "async": "^1.5.0", + "body-parser": "^1.15.2", + "express": "^4.14.0", "google-auth-library": "^0.9.2", "google-protobuf": "^3.0.0", "istanbul": "^0.3.21", diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 80b4f048c2..ff773ac334 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -1022,6 +1022,10 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); gpr_mu_destroy(&calld->mu); GPR_ASSERT(calld->waiting_ops_count == 0); + if (calld->connected_subchannel != NULL) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, + "picked"); + } gpr_free(calld->waiting_ops); gpr_free(and_free_memory); } diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index f10d74ced6..a5d7c0df45 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -183,9 +183,10 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -void grpc_connected_subchannel_ref( +grpc_connected_subchannel *grpc_connected_subchannel_ref( grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); + return c; } void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 5652074074..93bd72d20d 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -97,7 +97,7 @@ grpc_subchannel *grpc_subchannel_weak_ref( void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_connected_subchannel_ref( +grpc_connected_subchannel *grpc_connected_subchannel_ref( grpc_connected_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *channel diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 5d3433df74..ac3c6a305a 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -209,7 +209,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, /* Check atomically for a selected channel */ grpc_connected_subchannel *selected = GET_SELECTED(p); if (selected != NULL) { - *target = selected; + *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); return 1; } @@ -218,7 +218,7 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, selected = GET_SELECTED(p); if (selected) { gpr_mu_unlock(&p->mu); - *target = selected; + *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); return 1; } else { if (!p->started_picking) { @@ -310,7 +310,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = selected; + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index c0743b00e8..37a9b18b97 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -397,7 +397,9 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { /* readily available, report right away */ - *target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + *target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(selected->subchannel), + "picked"); if (user_data != NULL) { *user_data = selected->user_data; @@ -463,8 +465,9 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = - grpc_subchannel_get_connected_subchannel(selected->subchannel); + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(selected->subchannel), + "picked"); if (pp->user_data != NULL) { *pp->user_data = selected->user_data; } @@ -578,7 +581,9 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { gpr_mu_unlock(&p->mu); - target = grpc_subchannel_get_connected_subchannel(selected->subchannel); + target = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(selected->subchannel), + "picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); } else { gpr_mu_unlock(&p->mu); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 3df64faa1f..1ffa9165b2 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1023,9 +1023,14 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, } if (!s->write_closed) { if (t->is_client) { - GPR_ASSERT(s->id == 0); - grpc_chttp2_list_add_waiting_for_concurrency(t, s); - maybe_start_some_streams(exec_ctx, t); + if (!t->closed) { + GPR_ASSERT(s->id == 0); + grpc_chttp2_list_add_waiting_for_concurrency(t, s); + maybe_start_some_streams(exec_ctx, t); + } else { + grpc_chttp2_cancel_stream(exec_ctx, t, s, + GRPC_ERROR_CREATE("Transport closed")); + } } else { GPR_ASSERT(s->id != 0); grpc_chttp2_become_writable(exec_ctx, t, s, true, @@ -2190,7 +2195,7 @@ static void destructive_reclaimer_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_INT_HTTP2_ERROR, GRPC_CHTTP2_ENHANCE_YOUR_CALM)); if (n > 1) { - /* Since we cancel one stream per destructive reclaimation, if + /* Since we cancel one stream per destructive reclamation, if there are more streams left, we can immediately post a new reclaimer in case the resource quota needs to free more memory */ diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 8381f4a63a..db51ec4939 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1711,6 +1711,12 @@ retry: "pollset_add_fd: Raced creating new polling island. pi_new: %p " "(fd: %d, pollset: %p)", (void *)pi_new, fd->fd, (void *)pollset); + + /* No need to lock 'pi_new' here since this is a new polling island and + * no one has a reference to it yet */ + polling_island_remove_all_fds_locked(pi_new, true, &error); + + /* Ref and unref so that the polling island gets deleted during unref */ PI_ADD_REF(pi_new, "dance_of_destruction"); PI_UNREF(exec_ctx, pi_new, "dance_of_destruction"); goto retry; diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 6d7e549bb7..ddc7a88c5b 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -89,6 +89,7 @@ static void rulist_add_head(grpc_resource_user *resource_user, resource_user->links[list].prev = (*root)->links[list].prev; resource_user->links[list].next->links[list].prev = resource_user->links[list].prev->links[list].next = resource_user; + *root = resource_user; } } @@ -105,7 +106,6 @@ static void rulist_add_tail(grpc_resource_user *resource_user, resource_user->links[list].prev = *root; resource_user->links[list].next->links[list].prev = resource_user->links[list].prev->links[list].next = resource_user; - *root = resource_user; } } @@ -114,7 +114,7 @@ static bool rulist_empty(grpc_resource_quota *resource_quota, return resource_quota->roots[list] == NULL; } -static grpc_resource_user *rulist_pop_tail(grpc_resource_quota *resource_quota, +static grpc_resource_user *rulist_pop_head(grpc_resource_quota *resource_quota, grpc_rulist list) { grpc_resource_user **root = &resource_quota->roots[list]; grpc_resource_user *resource_user = *root; @@ -186,7 +186,7 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx, static bool rq_alloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { grpc_resource_user *resource_user; - while ((resource_user = rulist_pop_tail(resource_quota, + while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_AWAITING_ALLOCATION))) { gpr_mu_lock(&resource_user->mu); if (resource_user->free_pool < 0 && @@ -209,7 +209,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL); gpr_mu_unlock(&resource_user->mu); } else { - rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); + rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); gpr_mu_unlock(&resource_user->mu); return false; } @@ -221,7 +221,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, static bool rq_reclaim_from_per_user_free_pool( grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { grpc_resource_user *resource_user; - while ((resource_user = rulist_pop_tail(resource_quota, + while ((resource_user = rulist_pop_head(resource_quota, GRPC_RULIST_NON_EMPTY_FREE_POOL))) { gpr_mu_lock(&resource_user->mu); if (resource_user->free_pool > 0) { @@ -249,7 +249,7 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx, if (resource_quota->reclaiming) return true; grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE : GRPC_RULIST_RECLAIMER_BENIGN; - grpc_resource_user *resource_user = rulist_pop_tail(resource_quota, list); + grpc_resource_user *resource_user = rulist_pop_head(resource_quota, list); if (resource_user == NULL) return false; if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "RQ %s %s: initiate %s reclamation", @@ -325,7 +325,7 @@ static void ru_allocate(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { GRPC_RULIST_AWAITING_ALLOCATION)) { rq_step_sched(exec_ctx, resource_user->resource_quota); } - rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); + rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); } static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru, @@ -337,7 +337,7 @@ static void ru_add_to_free_pool(grpc_exec_ctx *exec_ctx, void *ru, GRPC_RULIST_NON_EMPTY_FREE_POOL)) { rq_step_sched(exec_ctx, resource_user->resource_quota); } - rulist_add_head(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL); + rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL); } static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, @@ -351,7 +351,7 @@ static void ru_post_benign_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, GRPC_RULIST_RECLAIMER_BENIGN)) { rq_step_sched(exec_ctx, resource_user->resource_quota); } - rulist_add_head(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); + rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN); } static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, @@ -367,7 +367,7 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) { rq_step_sched(exec_ctx, resource_user->resource_quota); } - rulist_add_head(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); + rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); } static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { @@ -563,9 +563,6 @@ void grpc_resource_user_init(grpc_resource_user *resource_user, for (int i = 0; i < GRPC_RULIST_COUNT; i++) { resource_user->links[i].next = resource_user->links[i].prev = NULL; } -#ifndef NDEBUG - resource_user->asan_canary = gpr_malloc(1); -#endif if (name != NULL) { resource_user->name = gpr_strdup(name); } else { @@ -592,9 +589,6 @@ void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user) { -#ifndef NDEBUG - gpr_free(resource_user->asan_canary); -#endif grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota); gpr_mu_destroy(&resource_user->mu); gpr_free(resource_user->name); diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index f644e5bf4d..f7e5ca6494 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -49,7 +49,8 @@ resource constrained, grpc_resource_user instances are asked (in turn) to free up whatever they can so that the system as a whole can make progress. - There are three kinds of reclamation that take place: + There are three kinds of reclamation that take place, in order of increasing + invasiveness: - an internal reclamation, where cached resource at the resource user level is returned to the quota - a benign reclamation phase, whereby resources that are in use but are not @@ -58,9 +59,14 @@ make progress may be enacted so that at least one part of the system can complete. - These reclamations are tried in priority order, and only one reclamation - is outstanding for a quota at any given time (meaning that if a destructive - reclamation makes progress, we may follow up with a benign reclamation). + Only one reclamation will be outstanding for a given quota at a given time. + On each reclamation attempt, the kinds of reclamation are tried in order of + increasing invasiveness, stopping at the first one that succeeds. Thus, on a + given reclamation attempt, if internal and benign reclamation both fail, it + will wind up doing a destructive reclamation. However, the next reclamation + attempt may then be able to get what it needs via internal or benign + reclamation, due to resources that may have been freed up by the destructive + reclamation in the previous attempt. Future work will be to expose the current resource pressure so that back pressure can be applied to avoid reclamation phases starting. @@ -112,11 +118,6 @@ struct grpc_resource_user { lock */ grpc_closure add_to_free_pool_closure; -#ifndef NDEBUG - /* Canary object to detect leaked resource users with ASAN */ - void *asan_canary; -#endif - gpr_mu mu; /* Total allocated memory outstanding by this resource user in bytes; always positive */ diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index f44829959d..6f54ae5383 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -315,6 +315,9 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp); } + /* Disable Nagle's Algorithm */ + uv_tcp_nodelay(handle, 1); + memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; tcp->handle = handle; diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 5d763b3875..798f582cad 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -842,6 +842,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, grpc_call_stack_element(grpc_call_get_call_stack(call), 0); if (error != GRPC_ERROR_NONE) { got_initial_metadata(exec_ctx, elem, error); + GRPC_ERROR_UNREF(error); return; } call_data *calld = elem->call_data; diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs index 49d0a111ef..5e61e9ec12 100644 --- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs +++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs @@ -52,9 +52,8 @@ namespace Grpc.Core.Internal // Completion of a pending unary response if not null. TaskCompletionSource<TResponse> unaryResponseTcs; - // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls. - // Indicates that response streaming call has finished. - TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>(); + // Completion of a streaming response call if not null. + TaskCompletionSource<object> streamingResponseCallFinishedTcs; // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers). // Response headers set here once received. @@ -198,6 +197,7 @@ namespace Grpc.Core.Internal byte[] payload = UnsafeSerialize(msg); + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartServerStreaming(HandleFinished, payload, metadataArray, GetWriteFlagsForCall()); @@ -219,6 +219,7 @@ namespace Grpc.Core.Internal Initialize(details.Channel.CompletionQueue); + streamingResponseCallFinishedTcs = new TaskCompletionSource<object>(); using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers)) { call.StartDuplexStreaming(HandleFinished, metadataArray); @@ -276,13 +277,13 @@ namespace Grpc.Core.Internal } /// <summary> - /// Get the task that completes once if streaming call finishes with ok status and throws RpcException with given status otherwise. + /// Get the task that completes once if streaming response call finishes with ok status and throws RpcException with given status otherwise. /// </summary> - public Task StreamingCallFinishedTask + public Task StreamingResponseCallFinishedTask { get { - return streamingCallFinishedTcs.Task; + return streamingResponseCallFinishedTcs.Task; } } @@ -529,11 +530,11 @@ namespace Grpc.Core.Internal var status = receivedStatus.Status; if (status.StatusCode != StatusCode.OK) { - streamingCallFinishedTcs.SetException(new RpcException(status)); + streamingResponseCallFinishedTcs.SetException(new RpcException(status)); return; } - streamingCallFinishedTcs.SetResult(null); + streamingResponseCallFinishedTcs.SetResult(null); } } } diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs index ad9423ff58..65bf60269a 100644 --- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs +++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs @@ -73,7 +73,7 @@ namespace Grpc.Core.Internal if (result == null) { - await call.StreamingCallFinishedTask.ConfigureAwait(false); + await call.StreamingResponseCallFinishedTask.ConfigureAwait(false); return false; } return true; diff --git a/src/node/performance/benchmark_client_express.js b/src/node/performance/benchmark_client_express.js new file mode 100644 index 0000000000..675eb5f288 --- /dev/null +++ b/src/node/performance/benchmark_client_express.js @@ -0,0 +1,291 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/** + * Benchmark client module + * @module + */ + +'use strict'; + +var fs = require('fs'); +var path = require('path'); +var util = require('util'); +var EventEmitter = require('events'); +var http = require('http'); +var https = require('https'); + +var async = require('async'); +var _ = require('lodash'); +var PoissonProcess = require('poisson-process'); +var Histogram = require('./histogram'); + +/** + * Convert a time difference, as returned by process.hrtime, to a number of + * nanoseconds. + * @param {Array.<number>} time_diff The time diff, represented as + * [seconds, nanoseconds] + * @return {number} The total number of nanoseconds + */ +function timeDiffToNanos(time_diff) { + return time_diff[0] * 1e9 + time_diff[1]; +} + +function BenchmarkClient(server_targets, channels, histogram_params, + security_params) { + var options = { + method: 'PUT', + headers: { + 'Content-Type': 'application/json' + } + }; + var protocol; + if (security_params) { + var ca_path; + protocol = https; + this.request = _.bind(https.request, https); + if (security_params.use_test_ca) { + ca_path = path.join(__dirname, '../test/data/ca.pem'); + var ca_data = fs.readFileSync(ca_path); + options.ca = ca_data; + } + if (security_params.server_host_override) { + var host_override = security_params.server_host_override; + options.servername = host_override; + } + } else { + protocol = http; + } + + this.request = _.bind(protocol.request, protocol); + + this.client_options = []; + + for (var i = 0; i < channels; i++) { + var host_port; + host_port = server_targets[i % server_targets.length].split(':') + var new_options = _.assign({hostname: host_port[0], port: +host_port[1]}, options); + new_options.agent = new protocol.Agent(new_options); + this.client_options[i] = new_options; + } + + this.histogram = new Histogram(histogram_params.resolution, + histogram_params.max_possible); + + this.running = false; + + this.pending_calls = 0; +} + +util.inherits(BenchmarkClient, EventEmitter); + +function startAllClients(client_options_list, outstanding_rpcs_per_channel, + makeCall, emitter) { + _.each(client_options_list, function(client_options) { + _.times(outstanding_rpcs_per_channel, function() { + makeCall(client_options); + }); + }); +} + +BenchmarkClient.prototype.startClosedLoop = function( + outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) { + var self = this; + + var options = {}; + + self.running = true; + + if (rpc_type == 'UNARY') { + options.path = '/serviceProto.BenchmarkService.service/unaryCall'; + } else { + self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type)); + } + + if (generic) { + self.emit('error', new Error('Generic client not supported')); + } + + self.last_wall_time = process.hrtime(); + + var argument = { + response_size: resp_size, + payload: { + body: '0'.repeat(req_size) + } + }; + + function makeCall(client_options) { + if (self.running) { + self.pending_calls++; + var start_time = process.hrtime(); + var req = self.request(client_options, function(res) { + var res_data = ''; + res.on('data', function(data) { + res_data += data; + }); + res.on('end', function() { + JSON.parse(res_data); + var time_diff = process.hrtime(start_time); + self.histogram.add(timeDiffToNanos(time_diff)); + makeCall(client_options); + self.pending_calls--; + if ((!self.running) && self.pending_calls == 0) { + self.emit('finished'); + } + }); + }); + req.write(JSON.stringify(argument)); + req.end(); + req.on('error', function(error) { + self.emit('error', new Error('Client error: ' + error.message)); + self.running = false; + }); + } + } + + startAllClients(_.map(self.client_options, _.partial(_.assign, options)), + outstanding_rpcs_per_channel, makeCall, self); +}; + +BenchmarkClient.prototype.startPoisson = function( + outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load, + generic) { + var self = this; + + var options = {}; + + self.running = true; + + if (rpc_type == 'UNARY') { + options.path = '/serviceProto.BenchmarkService.service/unaryCall'; + } else { + self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type)); + } + + if (generic) { + self.emit('error', new Error('Generic client not supported')); + } + + self.last_wall_time = process.hrtime(); + + var argument = { + response_size: resp_size, + payload: { + body: '0'.repeat(req_size) + } + }; + + function makeCall(client_options, poisson) { + if (self.running) { + self.pending_calls++; + var start_time = process.hrtime(); + var req = self.request(client_options, function(res) { + var res_data = ''; + res.on('data', function(data) { + res_data += data; + }); + res.on('end', function() { + JSON.parse(res_data); + var time_diff = process.hrtime(start_time); + self.histogram.add(timeDiffToNanos(time_diff)); + self.pending_calls--; + if ((!self.running) && self.pending_calls == 0) { + self.emit('finished'); + } + }); + }); + req.write(JSON.stringify(argument)); + req.end(); + req.on('error', function(error) { + self.emit('error', new Error('Client error: ' + error.message)); + self.running = false; + }); + } else { + poisson.stop(); + } + } + + var averageIntervalMs = (1 / offered_load) * 1000; + + startAllClients(_.map(self.client_options, _.partial(_.assign, options)), + outstanding_rpcs_per_channel, function(opts){ + var p = PoissonProcess.create(averageIntervalMs, function() { + makeCall(opts, p); + }); + p.start(); + }, self); +}; + +/** + * Return curent statistics for the client. If reset is set, restart + * statistic collection. + * @param {boolean} reset Indicates that statistics should be reset + * @return {object} Client statistics + */ +BenchmarkClient.prototype.mark = function(reset) { + var wall_time_diff = process.hrtime(this.last_wall_time); + var histogram = this.histogram; + if (reset) { + this.last_wall_time = process.hrtime(); + this.histogram = new Histogram(histogram.resolution, + histogram.max_possible); + } + + return { + latencies: { + bucket: histogram.getContents(), + min_seen: histogram.minimum(), + max_seen: histogram.maximum(), + sum: histogram.getSum(), + sum_of_squares: histogram.sumOfSquares(), + count: histogram.getCount() + }, + time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9, + // Not sure how to measure these values + time_user: 0, + time_system: 0 + }; +}; + +/** + * Stop the clients. + * @param {function} callback Called when the clients have finished shutting + * down + */ +BenchmarkClient.prototype.stop = function(callback) { + this.running = false; + this.on('finished', callback); +}; + +module.exports = BenchmarkClient; diff --git a/src/node/performance/benchmark_server.js b/src/node/performance/benchmark_server.js index 70cee9979b..6abde2e17a 100644 --- a/src/node/performance/benchmark_server.js +++ b/src/node/performance/benchmark_server.js @@ -40,6 +40,8 @@ var fs = require('fs'); var path = require('path'); +var EventEmitter = require('events'); +var util = require('util'); var genericService = require('./generic_service'); @@ -138,12 +140,15 @@ function BenchmarkServer(host, port, tls, generic, response_size) { this.server = server; } +util.inherits(BenchmarkServer, EventEmitter); + /** * Start the benchmark server. */ BenchmarkServer.prototype.start = function() { this.server.start(); this.last_wall_time = process.hrtime(); + this.emit('started'); }; /** diff --git a/src/node/performance/benchmark_server_express.js b/src/node/performance/benchmark_server_express.js new file mode 100644 index 0000000000..065bcf660b --- /dev/null +++ b/src/node/performance/benchmark_server_express.js @@ -0,0 +1,109 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/** + * Benchmark server module + * @module + */ + +'use strict'; + +var fs = require('fs'); +var path = require('path'); +var http = require('http'); +var https = require('https'); +var EventEmitter = require('events'); +var util = require('util'); + +var express = require('express'); +var bodyParser = require('body-parser') + +function unaryCall(req, res) { + var reqObj = req.body; + var payload = {body: '0'.repeat(reqObj.response_size)}; + res.json(payload); +} + +function BenchmarkServer(host, port, tls, generic, response_size) { + var app = express(); + app.use(bodyParser.json()) + app.put('/serviceProto.BenchmarkService.service/unaryCall', unaryCall); + this.input_host = host; + this.input_port = port; + if (tls) { + var credentials = {}; + var key_path = path.join(__dirname, '../test/data/server1.key'); + var pem_path = path.join(__dirname, '../test/data/server1.pem'); + + var key_data = fs.readFileSync(key_path); + var pem_data = fs.readFileSync(pem_path); + credentials['key'] = key_data; + credentials['cert'] = pem_data; + this.server = https.createServer(credentials, app); + } else { + this.server = http.createServer(app); + } +} + +util.inherits(BenchmarkServer, EventEmitter); + +BenchmarkServer.prototype.start = function() { + var self = this; + this.server.listen(this.input_port, this.input_hostname, function() { + self.last_wall_time = process.hrtime(); + self.emit('started'); + }); +}; + +BenchmarkServer.prototype.getPort = function() { + return this.server.address().port; +}; + +BenchmarkServer.prototype.mark = function(reset) { + var wall_time_diff = process.hrtime(this.last_wall_time); + if (reset) { + this.last_wall_time = process.hrtime(); + } + return { + time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9, + // Not sure how to measure these values + time_user: 0, + time_system: 0 + }; +}; + +BenchmarkServer.prototype.stop = function(callback) { + this.server.close(callback); +}; + +module.exports = BenchmarkServer; diff --git a/src/node/performance/worker.js b/src/node/performance/worker.js index 7ef9b84fe7..030bf7d7ba 100644 --- a/src/node/performance/worker.js +++ b/src/node/performance/worker.js @@ -34,18 +34,18 @@ 'use strict'; var console = require('console'); -var worker_service_impl = require('./worker_service_impl'); +var WorkerServiceImpl = require('./worker_service_impl'); var grpc = require('../../../'); var serviceProto = grpc.load({ root: __dirname + '/../../..', file: 'src/proto/grpc/testing/services.proto'}).grpc.testing; -function runServer(port) { +function runServer(port, benchmark_impl) { var server_creds = grpc.ServerCredentials.createInsecure(); var server = new grpc.Server(); server.addProtoService(serviceProto.WorkerService.service, - worker_service_impl); + new WorkerServiceImpl(benchmark_impl, server)); var address = '0.0.0.0:' + port; server.bind(address, server_creds); server.start(); @@ -57,9 +57,9 @@ if (require.main === module) { Error.stackTraceLimit = Infinity; var parseArgs = require('minimist'); var argv = parseArgs(process.argv, { - string: ['driver_port'] + string: ['driver_port', 'benchmark_impl'] }); - runServer(argv.driver_port); + runServer(argv.driver_port, argv.benchmark_impl); } exports.runServer = runServer; diff --git a/src/node/performance/worker_service_impl.js b/src/node/performance/worker_service_impl.js index 4b5cb8f9c2..3f317f6429 100644 --- a/src/node/performance/worker_service_impl.js +++ b/src/node/performance/worker_service_impl.js @@ -38,121 +38,141 @@ var console = require('console'); var BenchmarkClient = require('./benchmark_client'); var BenchmarkServer = require('./benchmark_server'); -exports.quitWorker = function quitWorker(call, callback) { - callback(null, {}); - process.exit(0); -} +module.exports = function WorkerServiceImpl(benchmark_impl, server) { + var BenchmarkClient; + var BenchmarkServer; + switch (benchmark_impl) { + case 'grpc': + BenchmarkClient = require('./benchmark_client'); + BenchmarkServer = require('./benchmark_server'); + break; + case 'express': + BenchmarkClient = require('./benchmark_client_express'); + BenchmarkServer = require('./benchmark_server_express'); + break; + default: + throw new Error('Unrecognized benchmark impl: ' + benchmark_impl); + } -exports.runClient = function runClient(call) { - var client; - call.on('data', function(request) { - var stats; - switch (request.argtype) { - case 'setup': - var setup = request.setup; - console.log('ClientConfig %j', setup); - client = new BenchmarkClient(setup.server_targets, - setup.client_channels, - setup.histogram_params, - setup.security_params); - client.on('error', function(error) { - call.emit('error', error); - }); - var req_size, resp_size, generic; - switch (setup.payload_config.payload) { - case 'bytebuf_params': - req_size = setup.payload_config.bytebuf_params.req_size; - resp_size = setup.payload_config.bytebuf_params.resp_size; - generic = true; + this.quitWorker = function quitWorker(call, callback) { + server.tryShutdown(function() { + callback(null, {}); + }); + }; + + this.runClient = function runClient(call) { + var client; + call.on('data', function(request) { + var stats; + switch (request.argtype) { + case 'setup': + var setup = request.setup; + console.log('ClientConfig %j', setup); + client = new BenchmarkClient(setup.server_targets, + setup.client_channels, + setup.histogram_params, + setup.security_params); + client.on('error', function(error) { + call.emit('error', error); + }); + var req_size, resp_size, generic; + switch (setup.payload_config.payload) { + case 'bytebuf_params': + req_size = setup.payload_config.bytebuf_params.req_size; + resp_size = setup.payload_config.bytebuf_params.resp_size; + generic = true; + break; + case 'simple_params': + req_size = setup.payload_config.simple_params.req_size; + resp_size = setup.payload_config.simple_params.resp_size; + generic = false; + break; + default: + call.emit('error', new Error('Unsupported PayloadConfig type' + + setup.payload_config.payload)); + } + switch (setup.load_params.load) { + case 'closed_loop': + client.startClosedLoop(setup.outstanding_rpcs_per_channel, + setup.rpc_type, req_size, resp_size, generic); + break; + case 'poisson': + client.startPoisson(setup.outstanding_rpcs_per_channel, + setup.rpc_type, req_size, resp_size, + setup.load_params.poisson.offered_load, generic); + break; + default: + call.emit('error', new Error('Unsupported LoadParams type' + + setup.load_params.load)); + } + stats = client.mark(); + call.write({ + stats: stats + }); break; - case 'simple_params': - req_size = setup.payload_config.simple_params.req_size; - resp_size = setup.payload_config.simple_params.resp_size; - generic = false; + case 'mark': + if (client) { + stats = client.mark(request.mark.reset); + call.write({ + stats: stats + }); + } else { + call.emit('error', new Error('Got Mark before ClientConfig')); + } break; default: - call.emit('error', new Error('Unsupported PayloadConfig type' + - setup.payload_config.payload)); + throw new Error('Nonexistent client argtype option: ' + request.argtype); } - switch (setup.load_params.load) { - case 'closed_loop': - client.startClosedLoop(setup.outstanding_rpcs_per_channel, - setup.rpc_type, req_size, resp_size, generic); + }); + call.on('end', function() { + client.stop(function() { + call.end(); + }); + }); + }; + + this.runServer = function runServer(call) { + var server; + call.on('data', function(request) { + var stats; + switch (request.argtype) { + case 'setup': + console.log('ServerConfig %j', request.setup); + server = new BenchmarkServer('[::]', request.setup.port, + request.setup.security_params); + server.on('started', function() { + stats = server.mark(); + call.write({ + stats: stats, + port: server.getPort() + }); + }); + server.start(); break; - case 'poisson': - client.startPoisson(setup.outstanding_rpcs_per_channel, - setup.rpc_type, req_size, resp_size, - setup.load_params.poisson.offered_load, generic); + case 'mark': + if (server) { + stats = server.mark(request.mark.reset); + call.write({ + stats: stats, + port: server.getPort(), + cores: 1 + }); + } else { + call.emit('error', new Error('Got Mark before ServerConfig')); + } break; default: - call.emit('error', new Error('Unsupported LoadParams type' + - setup.load_params.load)); + throw new Error('Nonexistent server argtype option'); } - stats = client.mark(); - call.write({ - stats: stats - }); - break; - case 'mark': - if (client) { - stats = client.mark(request.mark.reset); - call.write({ - stats: stats - }); - } else { - call.emit('error', new Error('Got Mark before ClientConfig')); - } - break; - default: - throw new Error('Nonexistent client argtype option: ' + request.argtype); - } - }); - call.on('end', function() { - client.stop(function() { - call.end(); }); - }); -}; - -exports.runServer = function runServer(call) { - var server; - call.on('data', function(request) { - var stats; - switch (request.argtype) { - case 'setup': - console.log('ServerConfig %j', request.setup); - server = new BenchmarkServer('[::]', request.setup.port, - request.setup.security_params); - server.start(); - stats = server.mark(); - call.write({ - stats: stats, - port: server.getPort() + call.on('end', function() { + server.stop(function() { + call.end(); }); - break; - case 'mark': - if (server) { - stats = server.mark(request.mark.reset); - call.write({ - stats: stats, - port: server.getPort(), - cores: 1 - }); - } else { - call.emit('error', new Error('Got Mark before ServerConfig')); - } - break; - default: - throw new Error('Nonexistent server argtype option'); - } - }); - call.on('end', function() { - server.stop(function() { - call.end(); }); - }); -}; + }; -exports.coreCount = function coreCount(call, callback) { - callback(null, {cores: os.cpus().length}); + this.coreCount = function coreCount(call, callback) { + callback(null, {cores: os.cpus().length}); + }; }; diff --git a/templates/package.json.template b/templates/package.json.template index e9596d4d4c..2b3d32ec99 100644 --- a/templates/package.json.template +++ b/templates/package.json.template @@ -36,6 +36,8 @@ }, "devDependencies": { "async": "^1.5.0", + "body-parser": "^1.15.2", + "express": "^4.14.0", "google-auth-library": "^0.9.2", "google-protobuf": "^3.0.0", "istanbul": "^0.3.21", diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py index ade818db98..bdb3874c21 100755 --- a/test/core/end2end/gen_build_yaml.py +++ b/test/core/end2end/gen_build_yaml.py @@ -84,8 +84,8 @@ END2END_FIXTURES = { TestOptions = collections.namedtuple( 'TestOptions', - 'needs_fullstack needs_dns proxyable secure traceable cpu_cost exclude_iomgrs large_writes') -default_test_options = TestOptions(False, False, True, False, True, 1.0, [], False) + 'needs_fullstack needs_dns proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky') +default_test_options = TestOptions(False, False, True, False, True, 1.0, [], False, False) connectivity_test_options = default_test_options._replace(needs_fullstack=True) LOWCPU = 0.1 @@ -108,7 +108,7 @@ END2END_TESTS = { proxyable=False, cpu_cost=LOWCPU, exclude_iomgrs=['uv']), 'default_host': default_test_options._replace(needs_fullstack=True, needs_dns=True), - 'disappearing_server': connectivity_test_options, + 'disappearing_server': connectivity_test_options._replace(flaky=True), 'empty_batch': default_test_options, 'filter_causes_close': default_test_options, 'filter_call_init_fails': default_test_options, @@ -263,7 +263,7 @@ def main(): 'ci_platforms': (END2END_FIXTURES[f].platforms if END2END_FIXTURES[f].ci_mac else without( END2END_FIXTURES[f].platforms, 'mac')), - 'flaky': False, + 'flaky': END2END_TESTS[t].flaky, 'language': 'c', 'cpu_cost': END2END_TESTS[t].cpu_cost, } @@ -280,7 +280,7 @@ def main(): 'ci_platforms': (END2END_FIXTURES[f].platforms if END2END_FIXTURES[f].ci_mac else without( END2END_FIXTURES[f].platforms, 'mac')), - 'flaky': False, + 'flaky': END2END_TESTS[t].flaky, 'language': 'c', 'cpu_cost': END2END_TESTS[t].cpu_cost, } diff --git a/test/core/end2end/tests/resource_quota_server.c b/test/core/end2end/tests/resource_quota_server.c index cf81b06692..10ceb962ae 100644 --- a/test/core/end2end/tests/resource_quota_server.c +++ b/test/core/end2end/tests/resource_quota_server.c @@ -340,9 +340,9 @@ void resource_quota_server(grpc_end2end_test_config config) { "Done. %d total calls: %d cancelled at server, %d cancelled at client.", NUM_CALLS, cancelled_calls_on_server, cancelled_calls_on_client); - /* The server call may be cancelled after it's received it's status, but - * before the client does: this means that we should see strictly more - * failures on the client than on the server */ + /* The call may be cancelled after the server has sent its status but before + * the client has received it. This means that we should see strictly more + * failures on the client than on the server. */ GPR_ASSERT(cancelled_calls_on_client >= cancelled_calls_on_server); /* However, we shouldn't see radically more... 0.9 is a guessed bound on what * we'd want that ratio to be... to at least trigger some investigation should diff --git a/test/core/iomgr/resource_quota_test.c b/test/core/iomgr/resource_quota_test.c index 85c6ce6e70..6dc9f8b76a 100644 --- a/test/core/iomgr/resource_quota_test.c +++ b/test/core/iomgr/resource_quota_test.c @@ -553,9 +553,13 @@ static void test_resource_user_stays_allocated_until_memory_released(void) { static void test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_released( void) { - gpr_log(GPR_INFO, "** test_pools_merged_on_resource_user_deletion **"); - grpc_resource_quota *q = - grpc_resource_quota_create("test_pools_merged_on_resource_user_deletion"); + gpr_log(GPR_INFO, + "** " + "test_resource_user_stays_allocated_and_reclaimers_unrun_until_" + "memory_released **"); + grpc_resource_quota *q = grpc_resource_quota_create( + "test_resource_user_stays_allocated_and_reclaimers_unrun_until_memory_" + "released"); grpc_resource_quota_resize(q, 1024); for (int i = 0; i < 10; i++) { grpc_resource_user usr; diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index d9f9cd748b..9bea229466 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -185,7 +185,7 @@ static void test_receive(int number_of_clients) { /* Create a socket, send a packet to the UDP server. */ clifd = socket(addr->ss_family, SOCK_DGRAM, 0); GPR_ASSERT(clifd >= 0); - GPR_ASSERT(connect(clifd, (struct sockaddr *)&addr, + GPR_ASSERT(connect(clifd, (struct sockaddr *)addr, (socklen_t)resolved_addr.len) == 0); GPR_ASSERT(5 == write(clifd, "hello", 5)); while (g_number_of_reads == number_of_reads_before && diff --git a/test/cpp/qps/client_sync.cc b/test/cpp/qps/client_sync.cc index 0ccf4e270b..f61e80d76b 100644 --- a/test/cpp/qps/client_sync.cc +++ b/test/cpp/qps/client_sync.cc @@ -130,11 +130,8 @@ class SynchronousUnaryClient GRPC_FINAL : public SynchronousClient { grpc::Status s = stub->UnaryCall(&context, request_, &responses_[thread_idx]); entry->set_value((UsageTimer::Now() - start) * 1e9); - if (!s.ok()) { - gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(), - s.error_message().c_str()); - } - return s.ok(); + entry->set_status(s.error_code()); + return true; } }; diff --git a/tools/run_tests/build_python.sh b/tools/run_tests/build_python.sh index b786c479f3..54e2fe5347 100755 --- a/tools/run_tests/build_python.sh +++ b/tools/run_tests/build_python.sh @@ -39,6 +39,14 @@ cd $(dirname $0)/../.. PLATFORM=`uname -s` +function is_msys() { + if [ "${PLATFORM/MSYS}" != "$PLATFORM" ]; then + echo true + else + exit 1 + fi +} + function is_mingw() { if [ "${PLATFORM/MINGW}" != "$PLATFORM" ]; then echo true @@ -108,6 +116,12 @@ VENV=${2:-$(venv $PYTHON)} VENV_RELATIVE_PYTHON=${3:-$(venv_relative_python)} TOOLCHAIN=${4:-$(toolchain)} +if [ $(is_msys) ]; then + echo "MSYS doesn't directly provide the right compiler(s);" + echo "switch to a MinGW shell." + exit 1 +fi + ROOT=`pwd` export CFLAGS="-I$ROOT/include -std=gnu99 -fno-wrapv $CFLAGS" export GRPC_PYTHON_BUILD_WITH_CYTHON=1 diff --git a/tools/run_tests/performance/scenario_config.py b/tools/run_tests/performance/scenario_config.py index 47774a1f1e..4e4c16642b 100644 --- a/tools/run_tests/performance/scenario_config.py +++ b/tools/run_tests/performance/scenario_config.py @@ -232,6 +232,15 @@ class CXXLanguage: secure=secure, categories=smoketest_categories + [SCALABLE]) + yield _ping_pong_scenario( + 'cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_%s' % secstr, + rpc_type='STREAMING', + client_type='ASYNC_CLIENT', + server_type='SYNC_SERVER', + unconstrained_client='async', + secure=secure, + categories=smoketest_categories+[SCALABLE]) + for rpc_type in ['unary', 'streaming']: for synchronicity in ['sync', 'async']: yield _ping_pong_scenario( @@ -361,7 +370,8 @@ class NodeLanguage: self.safename = str(self) def worker_cmdline(self): - return ['tools/run_tests/performance/run_worker_node.sh'] + return ['tools/run_tests/performance/run_worker_node.sh', + '--benchmark_impl=grpc'] def worker_port_offset(self): return 200 @@ -664,11 +674,69 @@ class GoLanguage: def __str__(self): return 'go' +class NodeExpressLanguage: + + def __init__(self): + pass + self.safename = str(self) + + def worker_cmdline(self): + return ['tools/run_tests/performance/run_worker_node.sh', + '--benchmark_impl=express'] + + def worker_port_offset(self): + return 700 + + def scenarios(self): + # TODO(jtattermusch): make this scenario work + #yield _ping_pong_scenario( + # 'node_generic_async_streaming_ping_pong', rpc_type='STREAMING', + # client_type='ASYNC_CLIENT', server_type='ASYNC_GENERIC_SERVER', + # use_generic_payload=True) + + # TODO(jtattermusch): make this scenario work + #yield _ping_pong_scenario( + # 'node_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', + # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER') + + yield _ping_pong_scenario( + 'node_protobuf_unary_ping_pong', rpc_type='UNARY', + client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + categories=[SCALABLE, SMOKETEST]) + + yield _ping_pong_scenario( + 'node_protobuf_async_unary_qps_unconstrained', rpc_type='UNARY', + client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + unconstrained_client='async', + categories=[SCALABLE, SMOKETEST]) + + # TODO(jtattermusch): make this scenario work + #yield _ping_pong_scenario( + # 'node_protobuf_async_streaming_qps_unconstrained', rpc_type='STREAMING', + # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + # unconstrained_client='async') + + # TODO(jtattermusch): make this scenario work + #yield _ping_pong_scenario( + # 'node_to_cpp_protobuf_async_unary_ping_pong', rpc_type='UNARY', + # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + # server_language='c++', server_core_limit=1, async_server_threads=1) + + # TODO(jtattermusch): make this scenario work + #yield _ping_pong_scenario( + # 'node_to_cpp_protobuf_async_streaming_ping_pong', rpc_type='STREAMING', + # client_type='ASYNC_CLIENT', server_type='ASYNC_SERVER', + # server_language='c++', server_core_limit=1, async_server_threads=1) + + def __str__(self): + return 'node_express' + LANGUAGES = { 'c++' : CXXLanguage(), 'csharp' : CSharpLanguage(), 'node' : NodeLanguage(), + 'node_express': NodeExpressLanguage(), 'ruby' : RubyLanguage(), 'java' : JavaLanguage(), 'python' : PythonLanguage(), diff --git a/tools/run_tests/run_performance_tests.py b/tools/run_tests/run_performance_tests.py index 099ab89ddf..7cb827ecea 100755 --- a/tools/run_tests/run_performance_tests.py +++ b/tools/run_tests/run_performance_tests.py @@ -42,6 +42,7 @@ import os import performance.scenario_config as scenario_config import pipes import re +import report_utils import subprocess import sys import tempfile @@ -453,6 +454,7 @@ if not scenarios: total_scenario_failures = 0 qps_workers_killed = 0 +merged_resultset = {} for scenario in scenarios: if args.dry_run: print(scenario.name) @@ -460,14 +462,20 @@ for scenario in scenarios: try: for worker in scenario.workers: worker.start() - scenario_failures, _ = jobset.run([scenario.jobspec, - create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)], - newline_on_success=True, maxjobs=1) + scenario_failures, resultset = jobset.run([scenario.jobspec, + create_quit_jobspec(scenario.workers, remote_host=args.remote_driver_host)], + newline_on_success=True, maxjobs=1) total_scenario_failures += scenario_failures + merged_resultset = dict(itertools.chain(merged_resultset.iteritems(), + resultset.iteritems())) finally: # Consider qps workers that need to be killed as failures qps_workers_killed += finish_qps_workers(scenario.workers) + +report_utils.render_junit_xml_report(merged_resultset, 'report.xml', + suite_name='benchmarks') + if total_scenario_failures > 0 or qps_workers_killed > 0: print ("%s scenarios failed and %s qps worker jobs killed" % (total_scenario_failures, qps_workers_killed)) sys.exit(1) diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py index db261d98c2..2886870d38 100755 --- a/tools/run_tests/run_tests.py +++ b/tools/run_tests/run_tests.py @@ -842,6 +842,53 @@ class Sanity(object): def __str__(self): return 'sanity' +class NodeExpressLanguage(object): + """Dummy Node express test target to enable running express performance + benchmarks""" + + def __init__(self): + self.platform = platform_string() + + def configure(self, config, args): + self.config = config + self.args = args + _check_compiler(self.args.compiler, ['default', 'node0.12', + 'node4', 'node5', 'node6']) + if self.args.compiler == 'default': + self.node_version = '4' + else: + # Take off the word "node" + self.node_version = self.args.compiler[4:] + + def test_specs(self): + return [] + + def pre_build_steps(self): + if self.platform == 'windows': + return [['tools\\run_tests\\pre_build_node.bat']] + else: + return [['tools/run_tests/pre_build_node.sh', self.node_version]] + + def make_targets(self): + return [] + + def make_options(self): + return [] + + def build_steps(self): + return [] + + def post_tests_steps(self): + return [] + + def makefile_name(self): + return 'Makefile' + + def dockerfile_dir(self): + return 'tools/dockerfile/test/node_jessie_%s' % _docker_arch_suffix(self.args.arch) + + def __str__(self): + return 'node_express' # different configurations we can run under with open('tools/run_tests/configs.json') as f: @@ -852,6 +899,7 @@ _LANGUAGES = { 'c++': CLanguage('cxx', 'c++'), 'c': CLanguage('c', 'c'), 'node': NodeLanguage(), + 'node_express': NodeExpressLanguage(), 'php': PhpLanguage(), 'php7': Php7Language(), 'python': PythonLanguage(), diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json index 68d99eed51..c3715995d2 100644 --- a/tools/run_tests/tests.json +++ b/tools/run_tests/tests.json @@ -5156,7 +5156,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_census_test", "platforms": [ @@ -6193,7 +6193,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_compress_test", "platforms": [ @@ -7230,7 +7230,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_fake_resolver_test", "platforms": [ @@ -8254,7 +8254,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_fakesec_test", "platforms": [ @@ -10179,7 +10179,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_full_test", "platforms": [ @@ -11165,7 +11165,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_full+pipe_test", "platforms": [ @@ -12071,7 +12071,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_full+trace_test", "platforms": [ @@ -13073,7 +13073,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_http_proxy_test", "platforms": [ @@ -14142,7 +14142,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_load_reporting_test", "platforms": [ @@ -15190,7 +15190,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_oauth2_test", "platforms": [ @@ -16222,7 +16222,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_proxy_test", "platforms": [ @@ -20033,7 +20033,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_ssl_test", "platforms": [ @@ -21070,7 +21070,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_ssl_cert_test", "platforms": [ @@ -22070,7 +22070,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_ssl_proxy_test", "platforms": [ @@ -22995,7 +22995,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_uds_test", "platforms": [ @@ -24008,7 +24008,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_census_nosec_test", "platforms": [ @@ -25022,7 +25022,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_compress_nosec_test", "platforms": [ @@ -26036,7 +26036,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_fake_resolver_nosec_test", "platforms": [ @@ -27947,7 +27947,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_full_nosec_test", "platforms": [ @@ -28914,7 +28914,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_full+pipe_nosec_test", "platforms": [ @@ -29797,7 +29797,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_full+trace_nosec_test", "platforms": [ @@ -30775,7 +30775,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_http_proxy_nosec_test", "platforms": [ @@ -31821,7 +31821,7 @@ "cpu_cost": 1.0, "exclude_configs": [], "exclude_iomgrs": [], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_load_reporting_nosec_test", "platforms": [ @@ -32797,7 +32797,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_proxy_nosec_test", "platforms": [ @@ -36487,7 +36487,7 @@ "exclude_iomgrs": [ "uv" ], - "flaky": false, + "flaky": true, "language": "c", "name": "h2_uds_nosec_test", "platforms": [ @@ -37319,6 +37319,27 @@ { "args": [ "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" + ], + "boringssl": true, + "ci_platforms": [ + "linux" + ], + "cpu_cost": 8, + "defaults": "boringssl", + "exclude_configs": [], + "flaky": false, + "language": "c++", + "name": "json_run_localhost", + "platforms": [ + "linux" + ], + "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_secure", + "timeout_seconds": 180 + }, + { + "args": [ + "--scenarios_json", "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_secure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": {\"use_test_ca\": true, \"server_host_override\": \"foo.test.google.fr\"}, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, @@ -37655,6 +37676,27 @@ { "args": [ "--scenarios_json", + "{\"scenarios\": [{\"name\": \"cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 0, \"core_limit\": 0, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"ASYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 64, \"async_client_threads\": 0, \"outstanding_rpcs_per_channel\": 100, \"rpc_type\": \"STREAMING\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 0}]}" + ], + "boringssl": true, + "ci_platforms": [ + "linux" + ], + "cpu_cost": 8, + "defaults": "boringssl", + "exclude_configs": [], + "flaky": false, + "language": "c++", + "name": "json_run_localhost", + "platforms": [ + "linux" + ], + "shortname": "json_run_localhost:cpp_protobuf_async_client_sync_server_streaming_qps_unconstrained_insecure", + "timeout_seconds": 180 + }, + { + "args": [ + "--scenarios_json", "{\"scenarios\": [{\"name\": \"cpp_protobuf_sync_unary_ping_pong_insecure\", \"warmup_seconds\": 0, \"benchmark_seconds\": 1, \"num_servers\": 1, \"server_config\": {\"async_server_threads\": 1, \"core_limit\": 1, \"security_params\": null, \"server_type\": \"SYNC_SERVER\"}, \"client_config\": {\"client_type\": \"SYNC_CLIENT\", \"security_params\": null, \"payload_config\": {\"simple_params\": {\"resp_size\": 0, \"req_size\": 0}}, \"client_channels\": 1, \"async_client_threads\": 1, \"outstanding_rpcs_per_channel\": 1, \"rpc_type\": \"UNARY\", \"load_params\": {\"closed_loop\": {}}, \"histogram_params\": {\"max_possible\": 60000000000.0, \"resolution\": 0.01}}, \"num_clients\": 1}]}" ], "boringssl": true, |